CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_2_9_HLT1_bphpatch4/src/EventFilter/ShmBuffer/src/FUShmBuffer.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUShmBuffer
00004 // -----------
00005 //
00006 //            15/11/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013 
00014 #include <unistd.h>
00015 #include <iostream>
00016 #include <string>
00017 #include <cassert>
00018 
00019 #include <sstream>
00020 #include <fstream>
00021 
00022 #include <cstdlib>
00023 #include <cstring>
00024 #include <stdint.h>
00025 
00026 // the shmem keys are henceforth going to be FIXED for a give userid
00027 // prior to creation, the application will attempt to get ownership
00028 // of existing segments by the same key and destroy them
00029 
00030 #define SHM_DESCRIPTOR_KEYID           1 /* Id used on ftok for 1. shmget key */
00031 #define SHM_KEYID                      2 /* Id used on ftok for 2. shmget key */
00032 #define SEM_KEYID                      1 /* Id used on ftok for semget key    */
00033 
00034 #define NSKIP_MAX                    100
00035 
00036 
00037 using namespace std;
00038 using namespace evf;
00039 
00040 
00041 //obsolete!!!
00042 const char* FUShmBuffer::shmKeyPath_ =
00043   (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045   (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00046 
00047 
00049 // construction/destruction
00051 
00052 //______________________________________________________________________________
00053 FUShmBuffer::FUShmBuffer(bool         segmentationMode,
00054                          unsigned int nRawCells,
00055                          unsigned int nRecoCells,
00056                          unsigned int nDqmCells,
00057                          unsigned int rawCellSize,
00058                          unsigned int recoCellSize,
00059                          unsigned int dqmCellSize)
00060   : segmentationMode_(segmentationMode)
00061   , nClientsMax_(128)
00062   , nRawCells_(nRawCells)
00063   , rawCellPayloadSize_(rawCellSize)
00064   , nRecoCells_(nRecoCells)
00065   , recoCellPayloadSize_(recoCellSize)
00066   , nDqmCells_(nDqmCells)
00067   , dqmCellPayloadSize_(dqmCellSize)
00068 {
00069   rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00070   recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00071   dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00072 
00073   void* addr;
00074   
00075   rawWriteOffset_=sizeof(FUShmBuffer);
00076   addr=(void*)((unsigned long)this+rawWriteOffset_);
00077   new (addr) unsigned int[nRawCells_];
00078   
00079   rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00080   addr=(void*)((unsigned long)this+rawReadOffset_);
00081   new (addr) unsigned int[nRawCells_];
00082  
00083   recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00084   addr=(void*)((unsigned long)this+recoWriteOffset_);
00085   new (addr) unsigned int[nRecoCells_];
00086 
00087   recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00088   addr=(void*)((unsigned long)this+recoReadOffset_);
00089   new (addr) unsigned int[nRecoCells_];
00090 
00091   dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00092   addr=(void*)((unsigned long)this+dqmWriteOffset_);
00093   new (addr) unsigned int[nDqmCells_];
00094 
00095   dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00096   addr=(void*)((unsigned long)this+dqmReadOffset_);
00097   new (addr) unsigned int[nDqmCells_];
00098 
00099   evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00100   addr=(void*)((unsigned long)this+evtStateOffset_);
00101   new (addr) evt::State_t[nRawCells_];
00102   
00103   evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00104   addr=(void*)((unsigned long)this+evtDiscardOffset_);
00105   new (addr) unsigned int[nRawCells_];
00106   
00107   evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00108   addr=(void*)((unsigned long)this+evtNumberOffset_);
00109   new (addr) unsigned int[nRawCells_];
00110   
00111   evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00112   addr=(void*)((unsigned long)this+evtPrcIdOffset_);
00113   new (addr) pid_t[nRawCells_];
00114   
00115   evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00116   addr=(void*)((unsigned long)this+evtTimeStampOffset_);
00117   new (addr) time_t[nRawCells_];
00118   
00119   dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00120   addr=(void*)((unsigned long)this+dqmStateOffset_);
00121   new (addr) dqm::State_t[nDqmCells_];
00122   
00123   clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00124   addr=(void*)((unsigned long)this+clientPrcIdOffset_);
00125   new (addr) pid_t[nClientsMax_];
00126 
00127   rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00128   
00129   if (segmentationMode_) {
00130     recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00131     dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00132     addr=(void*)((unsigned long)this+rawCellOffset_);
00133     new (addr) key_t[nRawCells_];
00134     addr=(void*)((unsigned long)this+recoCellOffset_);
00135     new (addr) key_t[nRecoCells_];
00136     addr=(void*)((unsigned long)this+dqmCellOffset_);
00137     new (addr) key_t[nDqmCells_];
00138   }
00139   else {
00140     recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00141     dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00142     for (unsigned int i=0;i<nRawCells_;i++) {
00143       addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
00144       new (addr) FUShmRawCell(rawCellSize);
00145     }
00146     for (unsigned int i=0;i<nRecoCells_;i++) {
00147       addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
00148       new (addr) FUShmRecoCell(recoCellSize);
00149     }
00150     for (unsigned int i=0;i<nDqmCells_;i++) {
00151       addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
00152       new (addr) FUShmDqmCell(dqmCellSize);
00153     }
00154   }
00155 }
00156 
00157 
00158 //______________________________________________________________________________
00159 FUShmBuffer::~FUShmBuffer()
00160 {
00161   
00162 }
00163 
00164 
00166 // implementation of member functions
00168 
00169 //______________________________________________________________________________
00170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00171 {
00172   shmid_=shmid;
00173   semid_=semid;
00174   
00175   if (segmentationMode_) {
00176     int    shmKeyId=666;
00177     key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
00178     for (unsigned int i=0;i<nRawCells_;i++) {
00179       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00180       int   shmid  =shm_create(*keyAddr,rawCellTotalSize_);
00181       void* shmAddr=shm_attach(shmid);
00182       new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00183       shmdt(shmAddr);
00184       ++keyAddr;
00185     }
00186     keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
00187     for (unsigned int i=0;i<nRecoCells_;i++) {
00188       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00189       int   shmid  =shm_create(*keyAddr,recoCellTotalSize_);
00190       void* shmAddr=shm_attach(shmid);
00191       new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00192       shmdt(shmAddr);
00193       ++keyAddr;
00194     }
00195     keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
00196     for (unsigned int i=0;i<nDqmCells_;i++) {
00197       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00198       int   shmid  =shm_create(*keyAddr,dqmCellTotalSize_);
00199       void* shmAddr=shm_attach(shmid);
00200       new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00201       shmdt(shmAddr);
00202       ++keyAddr;
00203     }
00204   }
00205   
00206   for (unsigned int i=0;i<nRawCells_;i++) {
00207     FUShmRawCell* cell=rawCell(i);
00208     cell->initialize(i);
00209     if (segmentationMode_) shmdt(cell);
00210   }
00211   
00212   for (unsigned int i=0;i<nRecoCells_;i++) {
00213     FUShmRecoCell* cell=recoCell(i);
00214     cell->initialize(i);
00215     if (segmentationMode_) shmdt(cell);
00216   }
00217   
00218   for (unsigned int i=0;i<nDqmCells_;i++) {
00219     FUShmDqmCell* cell=dqmCell(i);
00220     cell->initialize(i);
00221     if (segmentationMode_) shmdt(cell);
00222   }
00223 
00224   reset();
00225 }
00226 
00227 
00228 //______________________________________________________________________________
00229 void FUShmBuffer::reset()
00230 {
00231   nClients_=0;
00232 
00233   // setup ipc semaphores
00234   sem_init(0,1);          // lock (binary)
00235   sem_init(1,nRawCells_); // raw  write semaphore
00236   sem_init(2,0);          // raw  read  semaphore
00237   sem_init(3,1);          // binary semaphore to schedule raw event for discard
00238   sem_init(4,0);          // binary semaphore to discard raw event
00239   sem_init(5,nRecoCells_);// reco write semaphore
00240   sem_init(6,0);          // reco send (read) semaphore
00241   sem_init(7,nDqmCells_); // dqm  write semaphore
00242   sem_init(8,0);          // dqm  send (read) semaphore
00243 
00244   sem_print();
00245 
00246   unsigned int *iWrite,*iRead;
00247   
00248   rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00249   iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
00250   iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
00251   for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00252 
00253   recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00254   iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
00255   iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
00256   for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00257 
00258   dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00259   iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
00260   iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
00261   for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00262   
00263   for (unsigned int i=0;i<nRawCells_;i++) {
00264     setEvtState(i,evt::EMPTY);
00265     setEvtDiscard(i,0);
00266     setEvtNumber(i,0xffffffff);
00267     setEvtPrcId(i,0);
00268     setEvtTimeStamp(i,0);
00269   }
00270 
00271   for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00272 }
00273 
00274 
00275 //______________________________________________________________________________
00276 int FUShmBuffer::nbRawCellsToWrite() const
00277 {
00278   return semctl(semid(),1,GETVAL);
00279 }
00280 
00281 
00282 //______________________________________________________________________________
00283 int FUShmBuffer::nbRawCellsToRead() const
00284 {
00285   return semctl(semid(),2,GETVAL);
00286 }
00287 
00288 
00289 //______________________________________________________________________________
00290 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00291 {
00292   waitRawWrite();
00293   unsigned int  iCell=nextRawWriteIndex();
00294   FUShmRawCell* cell =rawCell(iCell);
00295   evt::State_t  state=evtState(iCell);
00296   assert(state==evt::EMPTY);
00297   setEvtState(iCell,evt::RAWWRITING);
00298   setEvtDiscard(iCell,1);
00299   return cell;
00300 }
00301 
00302 
00303 //______________________________________________________________________________
00304 FUShmRawCell* FUShmBuffer::rawCellToRead()
00305 {
00306   waitRawRead();
00307   unsigned int iCell=nextRawReadIndex();
00308   FUShmRawCell* cell=rawCell(iCell);
00309   evt::State_t  state=evtState(iCell);
00310   assert(state==evt::RAWWRITTEN
00311          ||state==evt::EMPTY
00312          ||state==evt::STOP
00313          ||state==evt::LUMISECTION);
00314   if (state==evt::RAWWRITTEN) {
00315     setEvtPrcId(iCell,getpid());
00316     setEvtState(iCell,evt::RAWREADING);
00317   }
00318   return cell;
00319 }
00320 
00321 
00322 //______________________________________________________________________________
00323 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00324 {
00325   waitRecoRead();
00326   unsigned int   iCell   =nextRecoReadIndex();
00327   FUShmRecoCell* cell    =recoCell(iCell);
00328   unsigned int   iRawCell=cell->rawCellIndex();
00329   if (iRawCell<nRawCells_) {
00330     //evt::State_t   state=evtState(iRawCell);
00331     //assert(state==evt::RECOWRITTEN);
00332     setEvtState(iRawCell,evt::SENDING);
00333   }
00334   return cell;
00335 }
00336 
00337 
00338 //______________________________________________________________________________
00339 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00340 {
00341   waitDqmRead();
00342   unsigned int  iCell=nextDqmReadIndex();
00343   FUShmDqmCell* cell=dqmCell(iCell);
00344   dqm::State_t  state=dqmState(iCell);
00345   assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00346   if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00347   return cell;
00348 }
00349 
00350 
00351 //______________________________________________________________________________
00352 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00353 {
00354   waitRawDiscarded();
00355   FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00356   evt::State_t  state=evtState(cell->index());
00357   assert(state==evt::PROCESSED
00358          ||state==evt::SENT
00359          ||state==evt::EMPTY
00360          ||state==evt::STOP
00361          ||state==evt::USEDLS);
00362   if (state!=evt::EMPTY 
00363       && state!=evt::USEDLS
00364       && state!=evt::STOP
00365       ) 
00366     setEvtState(cell->index(),evt::DISCARDING);
00367   return cell;
00368 }
00369 
00370 
00371 //______________________________________________________________________________
00372 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00373 {
00374   evt::State_t state=evtState(cell->index());
00375   assert(state==evt::RAWWRITING);
00376   setEvtState(cell->index(),evt::RAWWRITTEN);
00377   setEvtNumber(cell->index(),cell->evtNumber());
00378   postRawIndexToRead(cell->index());
00379   if (segmentationMode_) shmdt(cell);
00380   postRawRead();
00381 }
00382 
00383 
00384 //______________________________________________________________________________
00385 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00386 {
00387   evt::State_t state=evtState(cell->index());
00388   assert(state==evt::RAWREADING);
00389   setEvtState(cell->index(),evt::RAWREAD);
00390   setEvtState(cell->index(),evt::PROCESSING);
00391   setEvtTimeStamp(cell->index(),time(0));
00392   if (segmentationMode_) shmdt(cell);
00393 }
00394 
00395 
00396 //______________________________________________________________________________
00397 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00398 {
00399   unsigned int iRawCell=cell->rawCellIndex();
00400   if (iRawCell<nRawCells_) {
00401     //evt::State_t state=evtState(cell->rawCellIndex());
00402     //assert(state==evt::SENDING);
00403     setEvtState(cell->rawCellIndex(),evt::SENT);
00404   }
00405   if (segmentationMode_) shmdt(cell);
00406 }
00407 
00408 
00409 //______________________________________________________________________________
00410 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00411 {
00412   dqm::State_t state=dqmState(cell->index());
00413   assert(state==dqm::SENDING||state==dqm::EMPTY);
00414   if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00415   if (segmentationMode_) shmdt(cell);
00416 }
00417 
00418 
00419 //______________________________________________________________________________
00420 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00421 {
00422   waitRawDiscard();
00423   if (rawCellReadyForDiscard(iCell)) {
00424     rawDiscardIndex_=iCell;
00425     evt::State_t  state=evtState(iCell);
00426     assert(state==evt::PROCESSING
00427            ||state==evt::SENT
00428            ||state==evt::EMPTY
00429            ||state==evt::STOP
00430            ||state==evt::LUMISECTION
00431            );
00432     if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00433     if (state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00434     postRawDiscarded();
00435   }
00436   else postRawDiscard();
00437 }
00438 
00439 
00440 //______________________________________________________________________________
00441 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell)
00442 {
00443   waitRawDiscard();
00444   if (rawCellReadyForDiscard(iCell)) {
00445     rawDiscardIndex_=iCell;
00446     evt::State_t  state=evtState(iCell);
00447     if(state!=evt::LUMISECTION && state!=evt::EMPTY && state!=evt::USEDLS) setEvtState(iCell,evt::PROCESSED);
00448     if(state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00449     postRawDiscarded();
00450   }
00451   else postRawDiscard();
00452 }
00453 
00454 
00455 //______________________________________________________________________________
00456 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00457 {
00458   releaseRawCell(cell);
00459   postRawDiscard();
00460 }
00461 
00462 
00463 //______________________________________________________________________________
00464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00465 {
00466   FUShmRecoCell* cell=recoCell(iCell);
00467   unsigned int iRawCell=cell->rawCellIndex();
00468   if (iRawCell<nRawCells_) {
00469     //evt::State_t state=evtState(iRawCell);
00470     //assert(state==evt::SENT);
00471     scheduleRawCellForDiscard(iRawCell);
00472   }
00473   cell->clear();
00474   if (segmentationMode_) shmdt(cell);
00475   postRecoIndexToWrite(iCell);
00476   postRecoWrite();
00477 }
00478 
00479 //______________________________________________________________________________
00480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
00481 {
00482   FUShmRecoCell* cell=recoCell(iCell);
00483   cell->clear();
00484   if (segmentationMode_) shmdt(cell);
00485   postRecoIndexToWrite(iCell);
00486   postRecoWrite();
00487 }
00488 
00489 
00490 //______________________________________________________________________________
00491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00492 {
00493   dqm::State_t state=dqmState(iCell);
00494   assert(state==dqm::EMPTY||state==dqm::SENT);
00495   setDqmState(iCell,dqm::DISCARDING);
00496   FUShmDqmCell* cell=dqmCell(iCell);
00497   cell->clear();
00498   if (segmentationMode_) shmdt(cell);
00499   setDqmState(iCell,dqm::EMPTY);
00500   postDqmIndexToWrite(iCell);
00501   postDqmWrite();
00502 }
00503 
00504 
00505 //______________________________________________________________________________
00506 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00507 {
00508   evt::State_t state=evtState(cell->index());
00509   if(!(
00510      state==evt::DISCARDING
00511      ||state==evt::RAWWRITING
00512      ||state==evt::EMPTY
00513      ||state==evt::STOP
00514      //     ||state==evt::LUMISECTION
00515      ||state==evt::USEDLS
00516      )) std::cout << "=================releaseRawCell state " << state 
00517                   << std::endl;
00518 
00519   assert(
00520          state==evt::DISCARDING
00521          ||state==evt::RAWWRITING
00522          ||state==evt::EMPTY
00523          ||state==evt::STOP
00524          //      ||state==evt::LUMISECTION
00525          ||state==evt::USEDLS
00526          );
00527   setEvtState(cell->index(),evt::EMPTY);
00528   setEvtDiscard(cell->index(),0);
00529   setEvtNumber(cell->index(),0xffffffff);
00530   setEvtPrcId(cell->index(),0);
00531   setEvtTimeStamp(cell->index(),0);
00532   cell->clear();
00533   postRawIndexToWrite(cell->index());
00534   if (segmentationMode_) shmdt(cell);
00535   postRawWrite();
00536 }
00537 
00538 
00539 //______________________________________________________________________________
00540 void FUShmBuffer::writeRawEmptyEvent()
00541 {
00542   FUShmRawCell* cell=rawCellToWrite();
00543   evt::State_t state=evtState(cell->index());
00544   assert(state==evt::RAWWRITING);
00545   setEvtState(cell->index(),evt::STOP);
00546   cell->setEventTypeStopper();
00547   postRawIndexToRead(cell->index());
00548   if (segmentationMode_) shmdt(cell);
00549   postRawRead();
00550 }
00551 
00552 //______________________________________________________________________________
00553 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls)
00554 {
00555   FUShmRawCell* cell=rawCellToWrite();
00556   cell->setLumiSection(ls);
00557   evt::State_t state=evtState(cell->index());
00558   assert(state==evt::RAWWRITING);
00559   setEvtState(cell->index(),evt::LUMISECTION);
00560   cell->setEventTypeEol();
00561   postRawIndexToRead(cell->index());
00562   if (segmentationMode_) shmdt(cell);
00563   postRawRead();
00564 }
00565 
00566 
00567 //______________________________________________________________________________
00568 void FUShmBuffer::writeRecoEmptyEvent()
00569 {
00570   waitRecoWrite();
00571   unsigned int   iCell=nextRecoWriteIndex();
00572   FUShmRecoCell* cell =recoCell(iCell);
00573   cell->clear();
00574   postRecoIndexToRead(iCell);
00575   if (segmentationMode_) shmdt(cell);
00576   postRecoRead();
00577 }
00578 
00579 
00580 //______________________________________________________________________________
00581 void FUShmBuffer::writeDqmEmptyEvent()
00582 {
00583   waitDqmWrite();
00584   unsigned int  iCell=nextDqmWriteIndex();
00585   FUShmDqmCell* cell=dqmCell(iCell);
00586   cell->clear();
00587   postDqmIndexToRead(iCell);
00588   if (segmentationMode_) shmdt(cell);
00589   postDqmRead();
00590 }
00591 
00592 
00593 //______________________________________________________________________________
00594 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00595 {
00596   FUShmRawCell* cell=rawCellToWrite();
00597   rawDiscardIndex_=cell->index();
00598   setEvtState(cell->index(),evt::STOP);
00599   cell->setEventTypeStopper();
00600   setEvtNumber(cell->index(),0xffffffff);
00601   setEvtPrcId(cell->index(),0);
00602   setEvtTimeStamp(cell->index(),0);
00603   postRawDiscarded();
00604 }
00605 
00606 
00607 //______________________________________________________________________________
00608 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00609 {
00610   waitRawDiscard();
00611   if (rawCellReadyForDiscard(cell->index())) {
00612     rawDiscardIndex_=cell->index();
00613     // as this function is called by the reader, the state and type should 
00614     // already be correct
00615     //    setEvtState(cell->index(),evt::STOP);
00616     //    cell->setEventType(evt::STOPPER);
00617     //    setEvtNumber(cell->index(),0xffffffff);
00618     //    setEvtPrcId(cell->index(),0);
00619     //    setEvtTimeStamp(cell->index(),0);
00620     removeClientPrcId(getpid());
00621     if (segmentationMode_) shmdt(cell);
00622     postRawDiscarded();
00623   }
00624   else postRawDiscard();
00625 }
00626 
00627 //______________________________________________________________________________
00628 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell)
00629 {
00630   //  waitRawDiscard();
00631   if (rawCellReadyForDiscard(cell->index())) {
00632     rawDiscardIndex_=cell->index();
00633     //    setEvtState(cell->index(),evt::STOP);
00634     //    cell->setEventType(evt::STOPPER);
00635     //    setEvtNumber(cell->index(),0xffffffff);
00636     //    setEvtPrcId(cell->index(),0);
00637     //    setEvtTimeStamp(cell->index(),0);
00638     //    removeClientPrcId(getpid());
00639     if (segmentationMode_) shmdt(cell);
00640     postRawDiscarded();
00641   }
00642   else postRawDiscard();
00643 }
00644 
00645 
00646 //______________________________________________________________________________
00647 bool FUShmBuffer::writeRecoInitMsg(unsigned int   outModId,
00648                                    unsigned int   fuProcessId,
00649                                    unsigned int   fuGuid,
00650                                    unsigned char *data,
00651                                    unsigned int   dataSize)
00652 {
00653   if (dataSize>recoCellPayloadSize_) {
00654     cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00655     return false;
00656   }
00657   
00658   waitRecoWrite();
00659   unsigned int   iCell=nextRecoWriteIndex();
00660   FUShmRecoCell* cell =recoCell(iCell);
00661   cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00662   postRecoIndexToRead(iCell);
00663   if (segmentationMode_) shmdt(cell);
00664   postRecoRead();
00665   return true;
00666 }
00667 
00668 
00669 //______________________________________________________________________________
00670 bool FUShmBuffer::writeRecoEventData(unsigned int   runNumber,
00671                                      unsigned int   evtNumber,
00672                                      unsigned int   outModId,
00673                                      unsigned int   fuProcessId,
00674                                      unsigned int   fuGuid,
00675                                      unsigned char *data,
00676                                      unsigned int   dataSize)
00677 {
00678   if (dataSize>recoCellPayloadSize_) {
00679     cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00680     return false;
00681   }
00682   
00683   waitRecoWrite();
00684   unsigned int   iCell=nextRecoWriteIndex();
00685   FUShmRecoCell* cell =recoCell(iCell);
00686   unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00687   //evt::State_t state=evtState(rawCellIndex);
00688   //assert(state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT);
00689   setEvtState(rawCellIndex,evt::RECOWRITING);
00690   incEvtDiscard(rawCellIndex);
00691   cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00692                        fuProcessId,fuGuid,data,dataSize);
00693   setEvtState(rawCellIndex,evt::RECOWRITTEN);
00694   postRecoIndexToRead(iCell);
00695   if (segmentationMode_) shmdt(cell);
00696   postRecoRead();
00697   return true;
00698 }
00699 
00700 
00701 //______________________________________________________________________________
00702 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00703                                       unsigned int fuProcessId,
00704                                       unsigned int iRawCell)
00705 {
00706   FUShmRawCell *raw=rawCell(iRawCell);
00707 
00708   unsigned int   dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00709   unsigned char *data    =new unsigned char[dataSize];
00710   uint32_t      *pos     =(uint32_t*)data;
00711   // 06-Oct-2008, KAB - added a version number for the error event format.
00712   //
00713   // Version 1 had no version number, so the run number appeared in the
00714   // first value.  So, a reasonable test for version 1 is whether the
00715   // first value is larger than some relatively small cutoff (say 32).
00716   // Version 2 added the lumi block number.
00717   //
00718   *pos++=(uint32_t)2;  // protocol version number
00719   *pos++=(uint32_t)runNumber;
00720   *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00721   *pos++=(uint32_t)raw->evtNumber();
00722   for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00723   memcpy(pos,raw->payloadAddr(),raw->eventSize());
00724   
00725   // DEBUG
00726   /*
00727     if (1) {
00728     stringstream ss;
00729     ss<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".err";
00730     ofstream fout;
00731     fout.open(ss.str().c_str(),ios::out|ios::binary);
00732     if (!fout.write((char*)data,dataSize))
00733     cout<<"Failed to write error event to "<<ss.str()<<endl;
00734     fout.close();
00735     
00736     stringstream ss2;
00737     ss2<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".info";
00738     ofstream fout2;
00739     fout2.open(ss2.str().c_str());
00740     fout2<<"dataSize = "<<dataSize<<endl;
00741     fout2<<"runNumber = "<<runNumber<<endl;
00742     fout2<<"evtNumber = "<<raw->evtNumber()<<endl;
00743     fout2<<"eventSize = "<<raw->eventSize()<<endl;
00744     unsigned int totalSize(0);
00745     for (unsigned int i=0;i<1024;i++) {
00746     unsigned int fedSize = raw->fedSize(i);
00747     totalSize += fedSize;
00748     if (fedSize>0) fout2<<i<<": "<<fedSize<<endl;
00749     }
00750     fout2<<"totalSize = "<<totalSize<<endl;
00751     fout2.close();
00752     }
00753   */// END DEBUG
00754   
00755   waitRecoWrite();
00756   unsigned int   iRecoCell=nextRecoWriteIndex();
00757   FUShmRecoCell* reco     =recoCell(iRecoCell);
00758   setEvtState(iRawCell,evt::RECOWRITING);
00759   setEvtDiscard(iRawCell,1);
00760   reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00761                         data,dataSize);
00762   delete [] data;
00763   setEvtState(iRawCell,evt::RECOWRITTEN);
00764   postRecoIndexToRead(iRecoCell);
00765   if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00766   postRecoRead();
00767   return true;
00768 }
00769 
00770 
00771 //______________________________________________________________________________
00772 bool FUShmBuffer::writeDqmEventData(unsigned int   runNumber,
00773                                     unsigned int   evtAtUpdate,
00774                                     unsigned int   folderId,
00775                                     unsigned int   fuProcessId,
00776                                     unsigned int   fuGuid,
00777                                     unsigned char *data,
00778                                     unsigned int   dataSize)
00779 {
00780   if (dataSize>dqmCellPayloadSize_) {
00781     cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00782     return false;
00783   }
00784   
00785   waitDqmWrite();
00786   unsigned int  iCell=nextDqmWriteIndex();
00787   FUShmDqmCell* cell=dqmCell(iCell);
00788   dqm::State_t state=dqmState(iCell);
00789   assert(state==dqm::EMPTY);
00790   setDqmState(iCell,dqm::WRITING);
00791   cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00792   setDqmState(iCell,dqm::WRITTEN);
00793   postDqmIndexToRead(iCell);
00794   if (segmentationMode_) shmdt(cell);
00795   postDqmRead();
00796   return true;
00797 }
00798 
00799 
00800 //______________________________________________________________________________
00801 void FUShmBuffer::sem_print()
00802 {
00803   cout<<"--> current sem values:"
00804       <<endl
00805       <<" lock="<<semctl(semid(),0,GETVAL)
00806       <<endl
00807       <<" wraw="<<semctl(semid(),1,GETVAL)
00808       <<" rraw="<<semctl(semid(),2,GETVAL)
00809       <<endl
00810       <<" wdsc="<<semctl(semid(),3,GETVAL)
00811       <<" rdsc="<<semctl(semid(),4,GETVAL)
00812       <<endl
00813       <<" wrec="<<semctl(semid(),5,GETVAL)
00814       <<" rrec="<<semctl(semid(),6,GETVAL)
00815       <<endl
00816       <<" wdqm="<<semctl(semid(),7,GETVAL)
00817       <<" rdqm="<<semctl(semid(),8,GETVAL)
00818       <<endl;
00819 }
00820 
00821 
00822 //______________________________________________________________________________
00823 void FUShmBuffer::printEvtState(unsigned int index)
00824 {
00825   evt::State_t state=evtState(index);
00826   std::string stateName;
00827   if      (state==evt::EMPTY)      stateName="EMPTY";
00828   else if (state==evt::STOP)       stateName="STOP";
00829   else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00830   else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00831   else if (state==evt::RAWREADING) stateName="RAWREADING";
00832   else if (state==evt::RAWREAD)    stateName="RAWREAD";
00833   else if (state==evt::PROCESSING) stateName="PROCESSING";
00834   else if (state==evt::PROCESSED)  stateName="PROCESSED";
00835   else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00836   else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00837   else if (state==evt::SENDING)    stateName="SENDING";
00838   else if (state==evt::SENT)       stateName="SENT";
00839   else if (state==evt::DISCARDING) stateName="DISCARDING";
00840   cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00841 }
00842 
00843 
00844 //______________________________________________________________________________
00845 void FUShmBuffer::printDqmState(unsigned int index)
00846 {
00847   dqm::State_t state=dqmState(index);
00848   cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00849 }
00850 
00851 
00852 //______________________________________________________________________________
00853 FUShmBuffer* FUShmBuffer::createShmBuffer(bool         segmentationMode,
00854                                           unsigned int nRawCells,
00855                                           unsigned int nRecoCells,
00856                                           unsigned int nDqmCells,
00857                                           unsigned int rawCellSize,
00858                                           unsigned int recoCellSize,
00859                                           unsigned int dqmCellSize)
00860 {
00861   // if necessary, release shared memory first!
00862   if (FUShmBuffer::releaseSharedMemory())
00863     cout<<"FUShmBuffer::createShmBuffer: "
00864         <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00865         <<endl;
00866   
00867   // create bookkeeping shared memory segment
00868   int  size =sizeof(unsigned int)*7;
00869   int  shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00870   void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00871   
00872   if(1!=shm_nattch(shmid)) {
00873     cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00874     shmdt(shmAddr);
00875     return 0;
00876   }
00877   
00878   unsigned int* p=(unsigned int*)shmAddr;
00879   *p++=segmentationMode;
00880   *p++=nRawCells;
00881   *p++=nRecoCells;
00882   *p++=nDqmCells;
00883   *p++=rawCellSize;
00884   *p++=recoCellSize;
00885   *p++=dqmCellSize;
00886   shmdt(shmAddr);
00887   
00888   // create the 'real' shared memory buffer
00889   size     =FUShmBuffer::size(segmentationMode,
00890                               nRawCells,nRecoCells,nDqmCells,
00891                               rawCellSize,recoCellSize,dqmCellSize);
00892   shmid    =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00893   int semid=sem_create(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00894   shmAddr  =shm_attach(shmid);                         if (0==shmAddr) return 0;
00895   
00896   if (1!=shm_nattch(shmid)) {
00897     cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00898     shmdt(shmAddr);
00899     return 0;
00900   }
00901   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00902                                                nRawCells,nRecoCells,nDqmCells,
00903                                                rawCellSize,recoCellSize,dqmCellSize);
00904   
00905   cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00906   cout<<"                                segmentationMode="<<segmentationMode<<endl;
00907   
00908   buffer->initialize(shmid,semid);
00909   
00910   return buffer;
00911 }
00912 
00913 
00914 //______________________________________________________________________________
00915 FUShmBuffer* FUShmBuffer::getShmBuffer()
00916 {
00917   // get bookkeeping shared memory segment
00918   int   size   =sizeof(unsigned int)*7;
00919   int   shmid  =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00920   void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00921   
00922   unsigned int *p=(unsigned int*)shmAddr;
00923   bool          segmentationMode=*p++;
00924   unsigned int  nRawCells       =*p++;
00925   unsigned int  nRecoCells      =*p++;
00926   unsigned int  nDqmCells       =*p++;
00927   unsigned int  rawCellSize     =*p++;
00928   unsigned int  recoCellSize    =*p++;
00929   unsigned int  dqmCellSize     =*p++;
00930   shmdt(shmAddr);
00931 
00932   cout<<"FUShmBuffer::getShmBuffer():"
00933       <<" segmentationMode="<<segmentationMode
00934       <<" nRawCells="<<nRawCells
00935       <<" nRecoCells="<<nRecoCells
00936       <<" nDqmCells="<<nDqmCells
00937       <<" rawCellSize="<<rawCellSize
00938       <<" recoCellSize="<<recoCellSize
00939       <<" dqmCellSize="<<dqmCellSize
00940       <<endl;
00941   
00942   // get the 'real' shared memory buffer
00943   size     =FUShmBuffer::size(segmentationMode,
00944                               nRawCells,nRecoCells,nDqmCells,
00945                               rawCellSize,recoCellSize,dqmCellSize);
00946   shmid    =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00947   int semid=sem_get(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00948   shmAddr  =shm_attach(shmid);                      if (0==shmAddr) return 0;
00949   
00950   if (0==shm_nattch(shmid)) {
00951     cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00952     return 0;
00953   }
00954   
00955   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00956                                                nRawCells,nRecoCells,nDqmCells,
00957                                                rawCellSize,recoCellSize,dqmCellSize);
00958   
00959   cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00960   cout<<"                             segmentationMode="<<segmentationMode<<endl;
00961   
00962   buffer->setClientPrcId(getpid());
00963 
00964   return buffer;
00965 }
00966 
00967 
00968 //______________________________________________________________________________
00969 bool FUShmBuffer::releaseSharedMemory()
00970 {
00971   // get bookkeeping shared memory segment
00972   int   size   =sizeof(unsigned int)*7;
00973   int   shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00974   void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00975 
00976   unsigned int*p=(unsigned int*)shmAddr;
00977   bool         segmentationMode=*p++;
00978   unsigned int nRawCells       =*p++;
00979   unsigned int nRecoCells      =*p++;
00980   unsigned int nDqmCells       =*p++;
00981   unsigned int rawCellSize     =*p++;
00982   unsigned int recoCellSize    =*p++;
00983   unsigned int dqmCellSize     =*p++;
00984   shmdt(shmAddr);
00985 
00986   
00987   // get the 'real' shared memory segment
00988   size     =FUShmBuffer::size(segmentationMode,
00989                               nRawCells,nRecoCells,nDqmCells,
00990                               rawCellSize,recoCellSize,dqmCellSize);
00991   int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0)    return false;
00992   int semid=sem_get(FUShmBuffer::getSemKey(),9);   if (semid<0)    return false;
00993   shmAddr  =shm_attach(shmid);                     if (0==shmAddr) return false; 
00994 
00995   int att = 0;
00996   for(; att <10; att++)
00997     {
00998       if(shm_nattch(shmid)>1) {
00999         cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
01000              <<", failed attempt to release shared memory."<<endl;
01001 	::sleep(1);
01002       }
01003       else
01004         break;
01005     }
01006 
01007   if(att>=10) return false;
01008 
01009   if (segmentationMode) {
01010     FUShmBuffer* buffer=
01011       new (shmAddr) FUShmBuffer(segmentationMode,
01012                                 nRawCells,nRecoCells,nDqmCells,
01013                                 rawCellSize,recoCellSize,dqmCellSize);
01014     int cellid;
01015     for (unsigned int i=0;i<nRawCells;i++) {
01016       cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
01017       if ((shm_destroy(cellid)==-1)) return false;
01018     }
01019     for (unsigned int i=0;i<nRecoCells;i++) {
01020       cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
01021       if ((shm_destroy(cellid)==-1)) return false;
01022     }
01023     for (unsigned int i=0;i<nDqmCells;i++) {
01024       cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
01025       if ((shm_destroy(cellid)==-1)) return false;
01026     }
01027   }
01028   shmdt(shmAddr);
01029   if (sem_destroy(semid)==-1)  return false;
01030   if (shm_destroy(shmid)==-1)  return false;
01031   if (shm_destroy(shmidd)==-1) return false;
01032 
01033   return true;
01034 }
01035 
01036 
01037 //______________________________________________________________________________
01038 unsigned int FUShmBuffer::size(bool         segmentationMode,
01039                                unsigned int nRawCells,
01040                                unsigned int nRecoCells,
01041                                unsigned int nDqmCells,
01042                                unsigned int rawCellSize,
01043                                unsigned int recoCellSize,
01044                                unsigned int dqmCellSize)
01045 {
01046   unsigned int offset=
01047     sizeof(FUShmBuffer)+
01048     sizeof(unsigned int)*4*nRawCells+
01049     sizeof(evt::State_t)*nRawCells+
01050     sizeof(dqm::State_t)*nDqmCells;
01051   
01052   unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
01053   unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
01054   unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
01055   
01056   unsigned int realSize =
01057     (segmentationMode) ?
01058     offset
01059     +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
01060     :
01061     offset
01062     +rawCellTotalSize*nRawCells
01063     +recoCellTotalSize*nRecoCells
01064     +dqmCellTotalSize*nDqmCells;
01065 
01066   unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
01067   
01068   return result;
01069 }
01070 
01071 
01072 //______________________________________________________________________________
01073 key_t FUShmBuffer::getShmDescriptorKey()
01074 {
01075   key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
01076   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
01077                              <<"for file "<<shmKeyPath_<<"!"<<endl;
01078   return result;
01079 }
01080 
01081 
01082 //______________________________________________________________________________
01083 key_t FUShmBuffer::getShmKey()
01084 {
01085   key_t result=getuid()*1000+SHM_KEYID;
01086   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
01087                              <<"for file "<<shmKeyPath_<<"!"<<endl;
01088   return result;
01089 }
01090 
01091 
01092 //______________________________________________________________________________
01093 key_t FUShmBuffer::getSemKey()
01094 {
01095   key_t result=getuid()*1000+SEM_KEYID;
01096   if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
01097                              <<"for file "<<semKeyPath_<<"!"<<endl;
01098   return result;
01099 }
01100 
01101 
01102 //______________________________________________________________________________
01103 int FUShmBuffer::shm_create(key_t key,int size)
01104 {
01105   // first check and possibly remove existing segment with same id
01106   int shmid=shmget(key,1,0644);//using minimal size any segment with key "key" will be connected
01107   if(shmid!=-1){
01108     // an existing segment was found, remove it
01109     shmid_ds shmstat;
01110     int result=shmctl(shmid,IPC_STAT,&shmstat);
01111     cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01112          << " created by process " << shmstat.shm_cpid << " owned by "
01113          << shmstat.shm_perm.uid << " permissions " 
01114          << hex << shmstat.shm_perm.mode << dec << endl;
01115     result=shmctl(shmid,IPC_RMID,&shmstat);
01116   }
01117   shmid=shmget(key,size,IPC_CREAT|0644);
01118   if (shmid==-1) {
01119     int err=errno;
01120     cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01121         <<strerror(err)<<endl;
01122   }
01123   return shmid;
01124 }
01125 
01126 
01127 //______________________________________________________________________________
01128 int FUShmBuffer::shm_get(key_t key,int size)
01129 {
01130   int shmid=shmget(key,size,0644);
01131   if (shmid==-1) {
01132     int err=errno;
01133     cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01134         <<strerror(err)<<endl;
01135   }
01136   return shmid;
01137 }
01138 
01139 
01140 //______________________________________________________________________________
01141 void* FUShmBuffer::shm_attach(int shmid)
01142 {
01143   void* result=shmat(shmid,NULL,0);
01144   if (0==result) {
01145     int err=errno;
01146     cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01147         <<strerror(err)<<endl;
01148   }
01149   return result;
01150 }
01151 
01152 
01153 //______________________________________________________________________________
01154 int FUShmBuffer::shm_nattch(int shmid)
01155 {
01156   shmid_ds shmstat;
01157   shmctl(shmid,IPC_STAT,&shmstat);
01158   return shmstat.shm_nattch;
01159 }
01160 
01161 
01162 //______________________________________________________________________________
01163 int FUShmBuffer::shm_destroy(int shmid)
01164 {
01165   shmid_ds shmstat;
01166   int result=shmctl(shmid,IPC_RMID,&shmstat);
01167   if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01168   return result;
01169 }
01170 
01171 
01172 //______________________________________________________________________________
01173 int FUShmBuffer::sem_create(key_t key,int nsem)
01174 {
01175   int semid=semget(key,nsem,IPC_CREAT|0666);
01176   if (semid<0) {
01177     int err=errno;
01178     cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01179         <<strerror(err)<<endl;
01180   }
01181   return semid;
01182 }
01183 
01184 
01185 //______________________________________________________________________________
01186 int FUShmBuffer::sem_get(key_t key,int nsem)
01187 {
01188   int semid=semget(key,nsem,0666);
01189   if (semid<0) {
01190     int err=errno;
01191     cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01192         <<strerror(err)<<endl;
01193   }
01194   return semid;
01195 }
01196 
01197 
01198 //______________________________________________________________________________
01199 int FUShmBuffer::sem_destroy(int semid)
01200 {
01201   int result=semctl(semid,0,IPC_RMID);
01202   if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01203   return result;
01204 }
01205 
01206 
01207 
01209 // implementation of private member functions
01211 
01212 //______________________________________________________________________________
01213 unsigned int FUShmBuffer::nextIndex(unsigned int  offset,
01214                                     unsigned int  nCells,
01215                                     unsigned int& iNext)
01216 {
01217   lock();
01218   unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01219   pindex+=iNext;
01220   iNext=(iNext+1)%nCells;
01221   unsigned int result=*pindex;
01222   unlock();
01223   return result;
01224 }
01225 
01226 
01227 //______________________________________________________________________________
01228 void FUShmBuffer::postIndex(unsigned int  index,
01229                             unsigned int  offset,
01230                             unsigned int  nCells,
01231                             unsigned int& iLast)
01232 {
01233   lock();
01234   unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01235   pindex+=iLast;
01236   *pindex=index;
01237   iLast=(iLast+1)%nCells;
01238   unlock();
01239 }
01240 
01241 
01242 //______________________________________________________________________________
01243 unsigned int FUShmBuffer::nextRawWriteIndex()
01244 {
01245   return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01246 }
01247 
01248 
01249 //______________________________________________________________________________
01250 unsigned int FUShmBuffer::nextRawReadIndex()
01251 {
01252   return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01253 }
01254 
01255 
01256 //______________________________________________________________________________
01257 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01258 {
01259   postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01260 }
01261 
01262 
01263 //______________________________________________________________________________
01264 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01265 {
01266   postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01267 }
01268 
01269 
01270 //______________________________________________________________________________
01271 unsigned int FUShmBuffer::nextRecoWriteIndex()
01272 {
01273   return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01274 }
01275 
01276 
01277 //______________________________________________________________________________
01278 unsigned int FUShmBuffer::nextRecoReadIndex()
01279 {
01280   return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01281 }
01282 
01283 
01284 //______________________________________________________________________________
01285 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01286 {
01287   postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01288 }
01289 
01290 
01291 //______________________________________________________________________________
01292 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01293 {
01294   postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01295 }
01296 
01297 
01298 //______________________________________________________________________________
01299 unsigned int FUShmBuffer::nextDqmWriteIndex()
01300 {
01301   return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01302 }
01303 
01304 
01305 //______________________________________________________________________________
01306 unsigned int FUShmBuffer::nextDqmReadIndex()
01307 {
01308   return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01309 }
01310 
01311 
01312 //______________________________________________________________________________
01313 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01314 {
01315   postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01316 }
01317 
01318 
01319 //______________________________________________________________________________
01320 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01321 {
01322   postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01323 }
01324 
01325 
01326 //______________________________________________________________________________
01327 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01328 {
01329   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01330   for (unsigned int i=0;i<nRawCells_;i++) {
01331     if ((*pevt++)==evtNumber) return i;
01332   }
01333   assert(false);
01334   return 0xffffffff;
01335 }
01336 
01337 
01338 //______________________________________________________________________________
01339 evt::State_t FUShmBuffer::evtState(unsigned int index)
01340 {
01341   assert(index<nRawCells_);
01342   evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01343   pstate+=index;
01344   return *pstate;
01345 }
01346 
01347 
01348 //______________________________________________________________________________
01349 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01350 {
01351   assert(index<nDqmCells_);
01352   dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01353   pstate+=index;
01354   return *pstate;
01355 }
01356 
01357 
01358 //______________________________________________________________________________
01359 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01360 {
01361   assert(index<nRawCells_);
01362   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01363   pevt+=index;
01364   return *pevt;
01365 }
01366 
01367 
01368 //______________________________________________________________________________
01369 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01370 {
01371   assert(index<nRawCells_);
01372   pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01373   prcid+=index;
01374   return *prcid;
01375 }
01376 
01377 
01378 //______________________________________________________________________________
01379 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01380 {
01381   assert(index<nRawCells_);
01382   time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01383   ptstmp+=index;
01384   return *ptstmp;
01385 }
01386 
01387 
01388 //______________________________________________________________________________
01389 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01390 {
01391   assert(index<nClientsMax_);
01392   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01393   prcid+=index;
01394   return *prcid;
01395 }
01396 
01397 
01398 //______________________________________________________________________________
01399 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01400 {
01401   assert(index<nRawCells_);
01402   evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01403   pstate+=index;
01404   lock();
01405   *pstate=state;
01406   unlock();
01407   return true;
01408 }
01409 
01410 
01411 //______________________________________________________________________________
01412 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01413 {
01414   assert(index<nDqmCells_);
01415   dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01416   pstate+=index;
01417   lock();
01418   *pstate=state;
01419   unlock();
01420   return true;
01421 }
01422 
01423 
01424 //______________________________________________________________________________
01425 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01426 {
01427   assert(index<nRawCells_);
01428   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01429   pcount+=index;
01430   lock();
01431   *pcount=discard;
01432   unlock();
01433   return true;
01434 }
01435 
01436 
01437 //______________________________________________________________________________
01438 int FUShmBuffer::incEvtDiscard(unsigned int index)
01439 {
01440   int result = 0;
01441   assert(index<nRawCells_);
01442   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01443   pcount+=index;
01444   lock();
01445   (*pcount)++;
01446   result = *pcount;
01447   unlock();
01448   return result;
01449 }
01450 
01451 
01452 //______________________________________________________________________________
01453 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01454 {
01455   assert(index<nRawCells_);
01456   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01457   pevt+=index;
01458   lock();
01459   *pevt=evtNumber;
01460   unlock();
01461   return true;
01462 }
01463 
01464 
01465 //______________________________________________________________________________
01466 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01467 {
01468   assert(index<nRawCells_);
01469   pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01470   prcid+=index;
01471   lock();
01472   *prcid=prcId;
01473   unlock();
01474   return true;
01475 }
01476 
01477 
01478 //______________________________________________________________________________
01479 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01480 {
01481   assert(index<nRawCells_);
01482   time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01483   ptstmp+=index;
01484   lock();
01485   *ptstmp=timeStamp;
01486   unlock();
01487   return true;
01488 }
01489 
01490 
01491 //______________________________________________________________________________
01492 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01493 {
01494   lock();
01495   assert(nClients_<nClientsMax_);
01496   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01497   for (unsigned int i=0;i<nClients_;i++) {
01498     if ((*prcid)==prcId) { unlock();  return false; }
01499     prcid++;
01500   }
01501   nClients_++;
01502   *prcid=prcId;
01503   unlock();
01504   return true;
01505 }
01506 
01507 
01508 //______________________________________________________________________________
01509 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01510 {
01511   lock();
01512   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01513   unsigned int iClient(0);
01514   while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01515   assert(iClient!=nClients_);
01516   pid_t* next=prcid; next++;
01517   while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01518   nClients_--;
01519   unlock();
01520   return true;
01521 }
01522 
01523 
01524 //______________________________________________________________________________
01525 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01526 {
01527   FUShmRawCell* result(0);
01528   
01529   if (iCell>=nRawCells_) {
01530     cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01531         <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01532     return result;
01533   }
01534   
01535   if (segmentationMode_) {
01536     key_t         shmkey  =rawCellShmKey(iCell);
01537     int           shmid   =shm_get(shmkey,rawCellTotalSize_);
01538     void*         cellAddr=shm_attach(shmid);
01539     result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01540   }
01541   else {
01542     result=
01543       (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
01544   }
01545   
01546   return result;
01547 }
01548 
01549 
01550 //______________________________________________________________________________
01551 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01552 {
01553   FUShmRecoCell* result(0);
01554   
01555   if (iCell>=nRecoCells_) {
01556     cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01557         <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01558     return result;
01559   }
01560   
01561   if (segmentationMode_) {
01562     key_t         shmkey  =recoCellShmKey(iCell);
01563     int           shmid   =shm_get(shmkey,recoCellTotalSize_);
01564     void*         cellAddr=shm_attach(shmid);
01565     result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01566   }
01567   else {
01568     result=
01569       (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
01570   }
01571   
01572   return result;
01573 }
01574 
01575 
01576 //______________________________________________________________________________
01577 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01578 {
01579   FUShmDqmCell* result(0);
01580   
01581   if (iCell>=nDqmCells_) {
01582     cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01583         <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01584     return result;
01585   }
01586   
01587   if (segmentationMode_) {
01588     key_t         shmkey  =dqmCellShmKey(iCell);
01589     int           shmid   =shm_get(shmkey,dqmCellTotalSize_);
01590     void*         cellAddr=shm_attach(shmid);
01591     result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01592   }
01593   else {
01594     result=
01595       (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01596   }
01597   
01598   return result;
01599 }
01600 
01601 
01602 //______________________________________________________________________________
01603 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01604 {
01605   assert(index<nRawCells_);
01606   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01607   pcount+=index;
01608   lock();
01609   assert(*pcount>0);
01610   --(*pcount);
01611   bool result=(*pcount==0);
01612   unlock();
01613   return result;
01614 }
01615 
01616 
01617 //______________________________________________________________________________
01618 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01619 {
01620   if (!segmentationMode_) {
01621     cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01622     return -1;
01623   }
01624   key_t* addr=(key_t*)((unsigned long)this+offset);
01625   for (unsigned int i=0;i<iCell;i++) ++addr;
01626   return *addr;
01627 }
01628 
01629 
01630 //______________________________________________________________________________
01631 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01632 {
01633   if (iCell>=nRawCells_) {
01634     cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01635         <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01636     return -1;
01637   }
01638   return shmKey(iCell,rawCellOffset_);
01639 }
01640 
01641 
01642 //______________________________________________________________________________
01643 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01644 {
01645   if (iCell>=nRecoCells_) {
01646     cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01647         <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01648     return -1;
01649   }
01650   return shmKey(iCell,recoCellOffset_);
01651 }
01652 
01653 
01654 //______________________________________________________________________________
01655 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01656 {
01657   if (iCell>=nDqmCells_) {
01658     cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01659         <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01660     return -1;
01661   }
01662   return shmKey(iCell,dqmCellOffset_);
01663 }
01664 
01665 
01666 //______________________________________________________________________________
01667 void FUShmBuffer::sem_init(int isem,int value)
01668 {
01669   if (semctl(semid(),isem,SETVAL,value)<0) {
01670     cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01671   }
01672 }
01673 
01674 
01675 //______________________________________________________________________________
01676 int FUShmBuffer::sem_wait(int isem)
01677 {
01678   struct sembuf sops[1];
01679   sops[0].sem_num=isem;
01680   sops[0].sem_op =  -1;
01681   sops[0].sem_flg=   0;
01682   if (semop(semid(),sops,1)==-1) {
01683     cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01684     return -1;
01685   }
01686   return 0;
01687 }
01688 
01689 
01690 //______________________________________________________________________________
01691 void FUShmBuffer::sem_post(int isem)
01692 {
01693   struct sembuf sops[1];
01694   sops[0].sem_num=isem;
01695   sops[0].sem_op =   1;
01696   sops[0].sem_flg=   0;
01697   if (semop(semid(),sops,1)==-1) {
01698     cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01699   }
01700 }