CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch12/src/EventFilter/ShmBuffer/src/FUShmBuffer.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUShmBuffer
00004 // -----------
00005 //
00006 //            15/11/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013 
00014 #include <unistd.h>
00015 #include <iostream>
00016 #include <string>
00017 #include <cassert>
00018 
00019 #include <sstream>
00020 #include <fstream>
00021 
00022 #include <cstdlib>
00023 #include <cstring>
00024 #include <stdint.h>
00025 
00026 // the shmem keys are henceforth going to be FIXED for a give userid
00027 // prior to creation, the application will attempt to get ownership
00028 // of existing segments by the same key and destroy them
00029 
00030 #define SHM_DESCRIPTOR_KEYID           1 /* Id used on ftok for 1. shmget key */
00031 #define SHM_KEYID                      2 /* Id used on ftok for 2. shmget key */
00032 #define SEM_KEYID                      1 /* Id used on ftok for semget key    */
00033 
00034 #define NSKIP_MAX                    100
00035 
00036 
00037 using namespace std;
00038 using namespace evf;
00039 
00040 
00041 //obsolete!!!
00042 const char* FUShmBuffer::shmKeyPath_ =
00043   (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045   (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00046 
00047 
00049 // construction/destruction
00051 
00052 //______________________________________________________________________________
00053 FUShmBuffer::FUShmBuffer(bool         segmentationMode,
00054                          unsigned int nRawCells,
00055                          unsigned int nRecoCells,
00056                          unsigned int nDqmCells,
00057                          unsigned int rawCellSize,
00058                          unsigned int recoCellSize,
00059                          unsigned int dqmCellSize)
00060   : segmentationMode_(segmentationMode)
00061   , nClientsMax_(128)
00062   , nRawCells_(nRawCells)
00063   , rawCellPayloadSize_(rawCellSize)
00064   , nRecoCells_(nRecoCells)
00065   , recoCellPayloadSize_(recoCellSize)
00066   , nDqmCells_(nDqmCells)
00067   , dqmCellPayloadSize_(dqmCellSize)
00068 {
00069   rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00070   recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00071   dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00072 
00073   void* addr;
00074   
00075   rawWriteOffset_=sizeof(FUShmBuffer);
00076   addr=(void*)((unsigned long)this+rawWriteOffset_);
00077   new (addr) unsigned int[nRawCells_];
00078   
00079   rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00080   addr=(void*)((unsigned long)this+rawReadOffset_);
00081   new (addr) unsigned int[nRawCells_];
00082  
00083   recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00084   addr=(void*)((unsigned long)this+recoWriteOffset_);
00085   new (addr) unsigned int[nRecoCells_];
00086 
00087   recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00088   addr=(void*)((unsigned long)this+recoReadOffset_);
00089   new (addr) unsigned int[nRecoCells_];
00090 
00091   dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00092   addr=(void*)((unsigned long)this+dqmWriteOffset_);
00093   new (addr) unsigned int[nDqmCells_];
00094 
00095   dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00096   addr=(void*)((unsigned long)this+dqmReadOffset_);
00097   new (addr) unsigned int[nDqmCells_];
00098 
00099   evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00100   addr=(void*)((unsigned long)this+evtStateOffset_);
00101   new (addr) evt::State_t[nRawCells_];
00102   
00103   evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00104   addr=(void*)((unsigned long)this+evtDiscardOffset_);
00105   new (addr) unsigned int[nRawCells_];
00106   
00107   evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00108   addr=(void*)((unsigned long)this+evtNumberOffset_);
00109   new (addr) unsigned int[nRawCells_];
00110   
00111   evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00112   addr=(void*)((unsigned long)this+evtPrcIdOffset_);
00113   new (addr) pid_t[nRawCells_];
00114   
00115   evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00116   addr=(void*)((unsigned long)this+evtTimeStampOffset_);
00117   new (addr) time_t[nRawCells_];
00118   
00119   dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00120   addr=(void*)((unsigned long)this+dqmStateOffset_);
00121   new (addr) dqm::State_t[nDqmCells_];
00122   
00123   clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00124   addr=(void*)((unsigned long)this+clientPrcIdOffset_);
00125   new (addr) pid_t[nClientsMax_];
00126 
00127   rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00128   
00129   if (segmentationMode_) {
00130     recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00131     dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00132     addr=(void*)((unsigned long)this+rawCellOffset_);
00133     new (addr) key_t[nRawCells_];
00134     addr=(void*)((unsigned long)this+recoCellOffset_);
00135     new (addr) key_t[nRecoCells_];
00136     addr=(void*)((unsigned long)this+dqmCellOffset_);
00137     new (addr) key_t[nDqmCells_];
00138   }
00139   else {
00140     recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00141     dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00142     for (unsigned int i=0;i<nRawCells_;i++) {
00143       addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
00144       new (addr) FUShmRawCell(rawCellSize);
00145     }
00146     for (unsigned int i=0;i<nRecoCells_;i++) {
00147       addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
00148       new (addr) FUShmRecoCell(recoCellSize);
00149     }
00150     for (unsigned int i=0;i<nDqmCells_;i++) {
00151       addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
00152       new (addr) FUShmDqmCell(dqmCellSize);
00153     }
00154   }
00155 }
00156 
00157 
00158 //______________________________________________________________________________
00159 FUShmBuffer::~FUShmBuffer()
00160 {
00161   
00162 }
00163 
00164 
00166 // implementation of member functions
00168 
00169 //______________________________________________________________________________
00170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00171 {
00172   shmid_=shmid;
00173   semid_=semid;
00174   
00175   if (segmentationMode_) {
00176     int    shmKeyId=666;
00177     key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
00178     for (unsigned int i=0;i<nRawCells_;i++) {
00179       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00180       int   shmid  =shm_create(*keyAddr,rawCellTotalSize_);
00181       void* shmAddr=shm_attach(shmid);
00182       new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00183       shmdt(shmAddr);
00184       ++keyAddr;
00185     }
00186     keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
00187     for (unsigned int i=0;i<nRecoCells_;i++) {
00188       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00189       int   shmid  =shm_create(*keyAddr,recoCellTotalSize_);
00190       void* shmAddr=shm_attach(shmid);
00191       new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00192       shmdt(shmAddr);
00193       ++keyAddr;
00194     }
00195     keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
00196     for (unsigned int i=0;i<nDqmCells_;i++) {
00197       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00198       int   shmid  =shm_create(*keyAddr,dqmCellTotalSize_);
00199       void* shmAddr=shm_attach(shmid);
00200       new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00201       shmdt(shmAddr);
00202       ++keyAddr;
00203     }
00204   }
00205   
00206   for (unsigned int i=0;i<nRawCells_;i++) {
00207     FUShmRawCell* cell=rawCell(i);
00208     cell->initialize(i);
00209     if (segmentationMode_) shmdt(cell);
00210   }
00211   
00212   for (unsigned int i=0;i<nRecoCells_;i++) {
00213     FUShmRecoCell* cell=recoCell(i);
00214     cell->initialize(i);
00215     if (segmentationMode_) shmdt(cell);
00216   }
00217   
00218   for (unsigned int i=0;i<nDqmCells_;i++) {
00219     FUShmDqmCell* cell=dqmCell(i);
00220     cell->initialize(i);
00221     if (segmentationMode_) shmdt(cell);
00222   }
00223 
00224   reset();
00225 }
00226 
00227 
00228 //______________________________________________________________________________
00229 void FUShmBuffer::reset()
00230 {
00231   nClients_=0;
00232 
00233   // setup ipc semaphores
00234   sem_init(0,1);          // lock (binary)
00235   sem_init(1,nRawCells_); // raw  write semaphore
00236   sem_init(2,0);          // raw  read  semaphore
00237   sem_init(3,1);          // binary semaphore to schedule raw event for discard
00238   sem_init(4,0);          // binary semaphore to discard raw event
00239   sem_init(5,nRecoCells_);// reco write semaphore
00240   sem_init(6,0);          // reco send (read) semaphore
00241   sem_init(7,nDqmCells_); // dqm  write semaphore
00242   sem_init(8,0);          // dqm  send (read) semaphore
00243 
00244   sem_print();
00245 
00246   unsigned int *iWrite,*iRead;
00247   
00248   rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00249   iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
00250   iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
00251   for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00252 
00253   recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00254   iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
00255   iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
00256   for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00257 
00258   dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00259   iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
00260   iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
00261   for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00262   
00263   for (unsigned int i=0;i<nRawCells_;i++) {
00264     setEvtState(i,evt::EMPTY);
00265     setEvtDiscard(i,0);
00266     setEvtNumber(i,0xffffffff);
00267     setEvtPrcId(i,0);
00268     setEvtTimeStamp(i,0);
00269   }
00270 
00271   for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00272 }
00273 
00274 
00275 //______________________________________________________________________________
00276 int FUShmBuffer::nbRawCellsToWrite() const
00277 {
00278   return semctl(semid(),1,GETVAL);
00279 }
00280 
00281 
00282 //______________________________________________________________________________
00283 int FUShmBuffer::nbRawCellsToRead() const
00284 {
00285   return semctl(semid(),2,GETVAL);
00286 }
00287 
00288 
00289 //______________________________________________________________________________
00290 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00291 {
00292   waitRawWrite();
00293   unsigned int  iCell=nextRawWriteIndex();
00294   FUShmRawCell* cell =rawCell(iCell);
00295   evt::State_t  state=evtState(iCell);
00296   assert(state==evt::EMPTY);
00297   setEvtState(iCell,evt::RAWWRITING);
00298   setEvtDiscard(iCell,1);
00299   return cell;
00300 }
00301 
00302 
00303 //______________________________________________________________________________
00304 FUShmRawCell* FUShmBuffer::rawCellToRead()
00305 {
00306   waitRawRead();
00307   unsigned int iCell=nextRawReadIndex();
00308   FUShmRawCell* cell=rawCell(iCell);
00309   evt::State_t  state=evtState(iCell);
00310   assert(state==evt::RAWWRITTEN
00311          ||state==evt::EMPTY
00312          ||state==evt::STOP
00313          ||state==evt::LUMISECTION);
00314   if (state==evt::RAWWRITTEN) {
00315     setEvtPrcId(iCell,getpid());
00316     setEvtState(iCell,evt::RAWREADING);
00317   }
00318   return cell;
00319 }
00320 
00321 
00322 //______________________________________________________________________________
00323 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00324 {
00325   waitRecoRead();
00326   unsigned int   iCell   =nextRecoReadIndex();
00327   FUShmRecoCell* cell    =recoCell(iCell);
00328   unsigned int   iRawCell=cell->rawCellIndex();
00329   if (iRawCell<nRawCells_) {
00330     //evt::State_t   state=evtState(iRawCell);
00331     //assert(state==evt::RECOWRITTEN);
00332     setEvtState(iRawCell,evt::SENDING);
00333   }
00334   return cell;
00335 }
00336 
00337 
00338 //______________________________________________________________________________
00339 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00340 {
00341   waitDqmRead();
00342   unsigned int  iCell=nextDqmReadIndex();
00343   FUShmDqmCell* cell=dqmCell(iCell);
00344   dqm::State_t  state=dqmState(iCell);
00345   assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00346   if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00347   return cell;
00348 }
00349 
00350 
00351 //______________________________________________________________________________
00352 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00353 {
00354   waitRawDiscarded();
00355   FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00356   evt::State_t  state=evtState(cell->index());
00357   assert(state==evt::PROCESSED
00358          ||state==evt::SENT
00359          ||state==evt::EMPTY
00360          ||state==evt::STOP
00361          ||state==evt::LUMISECTION);
00362   if (state!=evt::EMPTY 
00363       && state!=evt::LUMISECTION
00364       && state!=evt::STOP
00365       ) 
00366     setEvtState(cell->index(),evt::DISCARDING);
00367   return cell;
00368 }
00369 
00370 
00371 //______________________________________________________________________________
00372 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00373 {
00374   evt::State_t state=evtState(cell->index());
00375   assert(state==evt::RAWWRITING);
00376   setEvtState(cell->index(),evt::RAWWRITTEN);
00377   setEvtNumber(cell->index(),cell->evtNumber());
00378   postRawIndexToRead(cell->index());
00379   if (segmentationMode_) shmdt(cell);
00380   postRawRead();
00381 }
00382 
00383 
00384 //______________________________________________________________________________
00385 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00386 {
00387   evt::State_t state=evtState(cell->index());
00388   assert(state==evt::RAWREADING);
00389   setEvtState(cell->index(),evt::RAWREAD);
00390   setEvtState(cell->index(),evt::PROCESSING);
00391   setEvtTimeStamp(cell->index(),time(0));
00392   if (segmentationMode_) shmdt(cell);
00393 }
00394 
00395 
00396 //______________________________________________________________________________
00397 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00398 {
00399   unsigned int iRawCell=cell->rawCellIndex();
00400   if (iRawCell<nRawCells_) {
00401     //evt::State_t state=evtState(cell->rawCellIndex());
00402     //assert(state==evt::SENDING);
00403     setEvtState(cell->rawCellIndex(),evt::SENT);
00404   }
00405   if (segmentationMode_) shmdt(cell);
00406 }
00407 
00408 
00409 //______________________________________________________________________________
00410 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00411 {
00412   dqm::State_t state=dqmState(cell->index());
00413   assert(state==dqm::SENDING||state==dqm::EMPTY);
00414   if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00415   if (segmentationMode_) shmdt(cell);
00416 }
00417 
00418 
00419 //______________________________________________________________________________
00420 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00421 {
00422   waitRawDiscard();
00423   if (rawCellReadyForDiscard(iCell)) {
00424     rawDiscardIndex_=iCell;
00425     evt::State_t  state=evtState(iCell);
00426     assert(state==evt::PROCESSING
00427            ||state==evt::SENT
00428            ||state==evt::EMPTY
00429            ||state==evt::STOP
00430            ||state==evt::LUMISECTION
00431            );
00432     if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00433     postRawDiscarded();
00434   }
00435   else postRawDiscard();
00436 }
00437 
00438 
00439 //______________________________________________________________________________
00440 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell)
00441 {
00442   waitRawDiscard();
00443   if (rawCellReadyForDiscard(iCell)) {
00444     rawDiscardIndex_=iCell;
00445     evt::State_t  state=evtState(iCell);
00446     assert(state==evt::RAWWRITTEN
00447            );
00448     setEvtState(iCell,evt::PROCESSED);
00449     postRawDiscarded();
00450   }
00451   else postRawDiscard();
00452 }
00453 
00454 
00455 //______________________________________________________________________________
00456 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00457 {
00458   releaseRawCell(cell);
00459   postRawDiscard();
00460 }
00461 
00462 
00463 //______________________________________________________________________________
00464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00465 {
00466   FUShmRecoCell* cell=recoCell(iCell);
00467   unsigned int iRawCell=cell->rawCellIndex();
00468   if (iRawCell<nRawCells_) {
00469     //evt::State_t state=evtState(iRawCell);
00470     //assert(state==evt::SENT);
00471     scheduleRawCellForDiscard(iRawCell);
00472   }
00473   cell->clear();
00474   if (segmentationMode_) shmdt(cell);
00475   postRecoIndexToWrite(iCell);
00476   postRecoWrite();
00477 }
00478 
00479 //______________________________________________________________________________
00480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
00481 {
00482   FUShmRecoCell* cell=recoCell(iCell);
00483   cell->clear();
00484   if (segmentationMode_) shmdt(cell);
00485   postRecoIndexToWrite(iCell);
00486   postRecoWrite();
00487 }
00488 
00489 
00490 //______________________________________________________________________________
00491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00492 {
00493   dqm::State_t state=dqmState(iCell);
00494   assert(state==dqm::EMPTY||state==dqm::SENT);
00495   setDqmState(iCell,dqm::DISCARDING);
00496   FUShmDqmCell* cell=dqmCell(iCell);
00497   cell->clear();
00498   if (segmentationMode_) shmdt(cell);
00499   setDqmState(iCell,dqm::EMPTY);
00500   postDqmIndexToWrite(iCell);
00501   postDqmWrite();
00502 }
00503 
00504 
00505 //______________________________________________________________________________
00506 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00507 {
00508   evt::State_t state=evtState(cell->index());
00509   if(!(
00510      state==evt::DISCARDING
00511      ||state==evt::RAWWRITING
00512      ||state==evt::EMPTY
00513      ||state==evt::STOP
00514      ||state==evt::LUMISECTION
00515      )) std::cout << "=================releaseRawCell state " << state 
00516                   << std::endl;
00517 
00518   assert(
00519          state==evt::DISCARDING
00520          ||state==evt::RAWWRITING
00521          ||state==evt::EMPTY
00522          ||state==evt::STOP
00523          ||state==evt::LUMISECTION
00524          );
00525   setEvtState(cell->index(),evt::EMPTY);
00526   setEvtDiscard(cell->index(),0);
00527   setEvtNumber(cell->index(),0xffffffff);
00528   setEvtPrcId(cell->index(),0);
00529   setEvtTimeStamp(cell->index(),0);
00530   cell->clear();
00531   postRawIndexToWrite(cell->index());
00532   if (segmentationMode_) shmdt(cell);
00533   postRawWrite();
00534 }
00535 
00536 
00537 //______________________________________________________________________________
00538 void FUShmBuffer::writeRawEmptyEvent()
00539 {
00540   FUShmRawCell* cell=rawCellToWrite();
00541   evt::State_t state=evtState(cell->index());
00542   assert(state==evt::RAWWRITING);
00543   setEvtState(cell->index(),evt::STOP);
00544   postRawIndexToRead(cell->index());
00545   if (segmentationMode_) shmdt(cell);
00546   postRawRead();
00547 }
00548 
00549 //______________________________________________________________________________
00550 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls)
00551 {
00552   FUShmRawCell* cell=rawCellToWrite();
00553   cell->setLumiSection(ls);
00554   evt::State_t state=evtState(cell->index());
00555   assert(state==evt::RAWWRITING);
00556   setEvtState(cell->index(),evt::LUMISECTION);
00557   postRawIndexToRead(cell->index());
00558   if (segmentationMode_) shmdt(cell);
00559   postRawRead();
00560 }
00561 
00562 
00563 //______________________________________________________________________________
00564 void FUShmBuffer::writeRecoEmptyEvent()
00565 {
00566   waitRecoWrite();
00567   unsigned int   iCell=nextRecoWriteIndex();
00568   FUShmRecoCell* cell =recoCell(iCell);
00569   cell->clear();
00570   postRecoIndexToRead(iCell);
00571   if (segmentationMode_) shmdt(cell);
00572   postRecoRead();
00573 }
00574 
00575 
00576 //______________________________________________________________________________
00577 void FUShmBuffer::writeDqmEmptyEvent()
00578 {
00579   waitDqmWrite();
00580   unsigned int  iCell=nextDqmWriteIndex();
00581   FUShmDqmCell* cell=dqmCell(iCell);
00582   cell->clear();
00583   postDqmIndexToRead(iCell);
00584   if (segmentationMode_) shmdt(cell);
00585   postDqmRead();
00586 }
00587 
00588 
00589 //______________________________________________________________________________
00590 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00591 {
00592   FUShmRawCell* cell=rawCellToWrite();
00593   rawDiscardIndex_=cell->index();
00594   setEvtState(cell->index(),evt::STOP);
00595   setEvtNumber(cell->index(),0xffffffff);
00596   setEvtPrcId(cell->index(),0);
00597   setEvtTimeStamp(cell->index(),0);
00598   postRawDiscarded();
00599 }
00600 
00601 
00602 //______________________________________________________________________________
00603 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00604 {
00605   waitRawDiscard();
00606   if (rawCellReadyForDiscard(cell->index())) {
00607     rawDiscardIndex_=cell->index();
00608     setEvtState(cell->index(),evt::STOP);
00609     setEvtNumber(cell->index(),0xffffffff);
00610     setEvtPrcId(cell->index(),0);
00611     setEvtTimeStamp(cell->index(),0);
00612     removeClientPrcId(getpid());
00613     if (segmentationMode_) shmdt(cell);
00614     postRawDiscarded();
00615   }
00616   else postRawDiscard();
00617 }
00618 
00619 //______________________________________________________________________________
00620 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell)
00621 {
00622   //  waitRawDiscard();
00623   if (rawCellReadyForDiscard(cell->index())) {
00624     rawDiscardIndex_=cell->index();
00625     setEvtState(cell->index(),evt::STOP);
00626     setEvtNumber(cell->index(),0xffffffff);
00627     setEvtPrcId(cell->index(),0);
00628     setEvtTimeStamp(cell->index(),0);
00629     //    removeClientPrcId(getpid());
00630     if (segmentationMode_) shmdt(cell);
00631     postRawDiscarded();
00632   }
00633   else postRawDiscard();
00634 }
00635 
00636 
00637 //______________________________________________________________________________
00638 bool FUShmBuffer::writeRecoInitMsg(unsigned int   outModId,
00639                                    unsigned int   fuProcessId,
00640                                    unsigned int   fuGuid,
00641                                    unsigned char *data,
00642                                    unsigned int   dataSize)
00643 {
00644   if (dataSize>recoCellPayloadSize_) {
00645     cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00646     return false;
00647   }
00648   
00649   waitRecoWrite();
00650   unsigned int   iCell=nextRecoWriteIndex();
00651   FUShmRecoCell* cell =recoCell(iCell);
00652   cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00653   postRecoIndexToRead(iCell);
00654   if (segmentationMode_) shmdt(cell);
00655   postRecoRead();
00656   return true;
00657 }
00658 
00659 
00660 //______________________________________________________________________________
00661 bool FUShmBuffer::writeRecoEventData(unsigned int   runNumber,
00662                                      unsigned int   evtNumber,
00663                                      unsigned int   outModId,
00664                                      unsigned int   fuProcessId,
00665                                      unsigned int   fuGuid,
00666                                      unsigned char *data,
00667                                      unsigned int   dataSize)
00668 {
00669   if (dataSize>recoCellPayloadSize_) {
00670     cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00671     return false;
00672   }
00673   
00674   waitRecoWrite();
00675   unsigned int   iCell=nextRecoWriteIndex();
00676   FUShmRecoCell* cell =recoCell(iCell);
00677   unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00678   //evt::State_t state=evtState(rawCellIndex);
00679   //assert(state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT);
00680   setEvtState(rawCellIndex,evt::RECOWRITING);
00681   incEvtDiscard(rawCellIndex);
00682   cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00683                        fuProcessId,fuGuid,data,dataSize);
00684   setEvtState(rawCellIndex,evt::RECOWRITTEN);
00685   postRecoIndexToRead(iCell);
00686   if (segmentationMode_) shmdt(cell);
00687   postRecoRead();
00688   return true;
00689 }
00690 
00691 
00692 //______________________________________________________________________________
00693 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00694                                       unsigned int fuProcessId,
00695                                       unsigned int iRawCell)
00696 {
00697   FUShmRawCell *raw=rawCell(iRawCell);
00698 
00699   unsigned int   dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00700   unsigned char *data    =new unsigned char[dataSize];
00701   uint32_t      *pos     =(uint32_t*)data;
00702   // 06-Oct-2008, KAB - added a version number for the error event format.
00703   //
00704   // Version 1 had no version number, so the run number appeared in the
00705   // first value.  So, a reasonable test for version 1 is whether the
00706   // first value is larger than some relatively small cutoff (say 32).
00707   // Version 2 added the lumi block number.
00708   //
00709   *pos++=(uint32_t)2;  // protocol version number
00710   *pos++=(uint32_t)runNumber;
00711   *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00712   *pos++=(uint32_t)raw->evtNumber();
00713   for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00714   memcpy(pos,raw->payloadAddr(),raw->eventSize());
00715   
00716   // DEBUG
00717   /*
00718     if (1) {
00719     stringstream ss;
00720     ss<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".err";
00721     ofstream fout;
00722     fout.open(ss.str().c_str(),ios::out|ios::binary);
00723     if (!fout.write((char*)data,dataSize))
00724     cout<<"Failed to write error event to "<<ss.str()<<endl;
00725     fout.close();
00726     
00727     stringstream ss2;
00728     ss2<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".info";
00729     ofstream fout2;
00730     fout2.open(ss2.str().c_str());
00731     fout2<<"dataSize = "<<dataSize<<endl;
00732     fout2<<"runNumber = "<<runNumber<<endl;
00733     fout2<<"evtNumber = "<<raw->evtNumber()<<endl;
00734     fout2<<"eventSize = "<<raw->eventSize()<<endl;
00735     unsigned int totalSize(0);
00736     for (unsigned int i=0;i<1024;i++) {
00737     unsigned int fedSize = raw->fedSize(i);
00738     totalSize += fedSize;
00739     if (fedSize>0) fout2<<i<<": "<<fedSize<<endl;
00740     }
00741     fout2<<"totalSize = "<<totalSize<<endl;
00742     fout2.close();
00743     }
00744   */// END DEBUG
00745   
00746   waitRecoWrite();
00747   unsigned int   iRecoCell=nextRecoWriteIndex();
00748   FUShmRecoCell* reco     =recoCell(iRecoCell);
00749   setEvtState(iRawCell,evt::RECOWRITING);
00750   setEvtDiscard(iRawCell,1);
00751   reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00752                         data,dataSize);
00753   delete [] data;
00754   setEvtState(iRawCell,evt::RECOWRITTEN);
00755   postRecoIndexToRead(iRecoCell);
00756   if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00757   postRecoRead();
00758   return true;
00759 }
00760 
00761 
00762 //______________________________________________________________________________
00763 bool FUShmBuffer::writeDqmEventData(unsigned int   runNumber,
00764                                     unsigned int   evtAtUpdate,
00765                                     unsigned int   folderId,
00766                                     unsigned int   fuProcessId,
00767                                     unsigned int   fuGuid,
00768                                     unsigned char *data,
00769                                     unsigned int   dataSize)
00770 {
00771   if (dataSize>dqmCellPayloadSize_) {
00772     cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00773     return false;
00774   }
00775   
00776   waitDqmWrite();
00777   unsigned int  iCell=nextDqmWriteIndex();
00778   FUShmDqmCell* cell=dqmCell(iCell);
00779   dqm::State_t state=dqmState(iCell);
00780   assert(state==dqm::EMPTY);
00781   setDqmState(iCell,dqm::WRITING);
00782   cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00783   setDqmState(iCell,dqm::WRITTEN);
00784   postDqmIndexToRead(iCell);
00785   if (segmentationMode_) shmdt(cell);
00786   postDqmRead();
00787   return true;
00788 }
00789 
00790 
00791 //______________________________________________________________________________
00792 void FUShmBuffer::sem_print()
00793 {
00794   cout<<"--> current sem values:"
00795       <<endl
00796       <<" lock="<<semctl(semid(),0,GETVAL)
00797       <<endl
00798       <<" wraw="<<semctl(semid(),1,GETVAL)
00799       <<" rraw="<<semctl(semid(),2,GETVAL)
00800       <<endl
00801       <<" wdsc="<<semctl(semid(),3,GETVAL)
00802       <<" rdsc="<<semctl(semid(),4,GETVAL)
00803       <<endl
00804       <<" wrec="<<semctl(semid(),5,GETVAL)
00805       <<" rrec="<<semctl(semid(),6,GETVAL)
00806       <<endl
00807       <<" wdqm="<<semctl(semid(),7,GETVAL)
00808       <<" rdqm="<<semctl(semid(),8,GETVAL)
00809       <<endl;
00810 }
00811 
00812 
00813 //______________________________________________________________________________
00814 void FUShmBuffer::printEvtState(unsigned int index)
00815 {
00816   evt::State_t state=evtState(index);
00817   std::string stateName;
00818   if      (state==evt::EMPTY)      stateName="EMPTY";
00819   else if (state==evt::STOP)       stateName="STOP";
00820   else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00821   else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00822   else if (state==evt::RAWREADING) stateName="RAWREADING";
00823   else if (state==evt::RAWREAD)    stateName="RAWREAD";
00824   else if (state==evt::PROCESSING) stateName="PROCESSING";
00825   else if (state==evt::PROCESSED)  stateName="PROCESSED";
00826   else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00827   else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00828   else if (state==evt::SENDING)    stateName="SENDING";
00829   else if (state==evt::SENT)       stateName="SENT";
00830   else if (state==evt::DISCARDING) stateName="DISCARDING";
00831   cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00832 }
00833 
00834 
00835 //______________________________________________________________________________
00836 void FUShmBuffer::printDqmState(unsigned int index)
00837 {
00838   dqm::State_t state=dqmState(index);
00839   cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00840 }
00841 
00842 
00843 //______________________________________________________________________________
00844 FUShmBuffer* FUShmBuffer::createShmBuffer(bool         segmentationMode,
00845                                           unsigned int nRawCells,
00846                                           unsigned int nRecoCells,
00847                                           unsigned int nDqmCells,
00848                                           unsigned int rawCellSize,
00849                                           unsigned int recoCellSize,
00850                                           unsigned int dqmCellSize)
00851 {
00852   // if necessary, release shared memory first!
00853   if (FUShmBuffer::releaseSharedMemory())
00854     cout<<"FUShmBuffer::createShmBuffer: "
00855         <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00856         <<endl;
00857   
00858   // create bookkeeping shared memory segment
00859   int  size =sizeof(unsigned int)*7;
00860   int  shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00861   void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00862   
00863   if(1!=shm_nattch(shmid)) {
00864     cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00865     shmdt(shmAddr);
00866     return 0;
00867   }
00868   
00869   unsigned int* p=(unsigned int*)shmAddr;
00870   *p++=segmentationMode;
00871   *p++=nRawCells;
00872   *p++=nRecoCells;
00873   *p++=nDqmCells;
00874   *p++=rawCellSize;
00875   *p++=recoCellSize;
00876   *p++=dqmCellSize;
00877   shmdt(shmAddr);
00878   
00879   // create the 'real' shared memory buffer
00880   size     =FUShmBuffer::size(segmentationMode,
00881                               nRawCells,nRecoCells,nDqmCells,
00882                               rawCellSize,recoCellSize,dqmCellSize);
00883   shmid    =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00884   int semid=sem_create(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00885   shmAddr  =shm_attach(shmid);                         if (0==shmAddr) return 0;
00886   
00887   if (1!=shm_nattch(shmid)) {
00888     cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00889     shmdt(shmAddr);
00890     return 0;
00891   }
00892   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00893                                                nRawCells,nRecoCells,nDqmCells,
00894                                                rawCellSize,recoCellSize,dqmCellSize);
00895   
00896   cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00897   cout<<"                                segmentationMode="<<segmentationMode<<endl;
00898   
00899   buffer->initialize(shmid,semid);
00900   
00901   return buffer;
00902 }
00903 
00904 
00905 //______________________________________________________________________________
00906 FUShmBuffer* FUShmBuffer::getShmBuffer()
00907 {
00908   // get bookkeeping shared memory segment
00909   int   size   =sizeof(unsigned int)*7;
00910   int   shmid  =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00911   void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00912   
00913   unsigned int *p=(unsigned int*)shmAddr;
00914   bool          segmentationMode=*p++;
00915   unsigned int  nRawCells       =*p++;
00916   unsigned int  nRecoCells      =*p++;
00917   unsigned int  nDqmCells       =*p++;
00918   unsigned int  rawCellSize     =*p++;
00919   unsigned int  recoCellSize    =*p++;
00920   unsigned int  dqmCellSize     =*p++;
00921   shmdt(shmAddr);
00922 
00923   cout<<"FUShmBuffer::getShmBuffer():"
00924       <<" segmentationMode="<<segmentationMode
00925       <<" nRawCells="<<nRawCells
00926       <<" nRecoCells="<<nRecoCells
00927       <<" nDqmCells="<<nDqmCells
00928       <<" rawCellSize="<<rawCellSize
00929       <<" recoCellSize="<<recoCellSize
00930       <<" dqmCellSize="<<dqmCellSize
00931       <<endl;
00932   
00933   // get the 'real' shared memory buffer
00934   size     =FUShmBuffer::size(segmentationMode,
00935                               nRawCells,nRecoCells,nDqmCells,
00936                               rawCellSize,recoCellSize,dqmCellSize);
00937   shmid    =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00938   int semid=sem_get(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00939   shmAddr  =shm_attach(shmid);                      if (0==shmAddr) return 0;
00940   
00941   if (0==shm_nattch(shmid)) {
00942     cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00943     return 0;
00944   }
00945   
00946   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00947                                                nRawCells,nRecoCells,nDqmCells,
00948                                                rawCellSize,recoCellSize,dqmCellSize);
00949   
00950   cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00951   cout<<"                             segmentationMode="<<segmentationMode<<endl;
00952   
00953   buffer->setClientPrcId(getpid());
00954 
00955   return buffer;
00956 }
00957 
00958 
00959 //______________________________________________________________________________
00960 bool FUShmBuffer::releaseSharedMemory()
00961 {
00962   // get bookkeeping shared memory segment
00963   int   size   =sizeof(unsigned int)*7;
00964   int   shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00965   void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00966 
00967   unsigned int*p=(unsigned int*)shmAddr;
00968   bool         segmentationMode=*p++;
00969   unsigned int nRawCells       =*p++;
00970   unsigned int nRecoCells      =*p++;
00971   unsigned int nDqmCells       =*p++;
00972   unsigned int rawCellSize     =*p++;
00973   unsigned int recoCellSize    =*p++;
00974   unsigned int dqmCellSize     =*p++;
00975   shmdt(shmAddr);
00976 
00977   
00978   // get the 'real' shared memory segment
00979   size     =FUShmBuffer::size(segmentationMode,
00980                               nRawCells,nRecoCells,nDqmCells,
00981                               rawCellSize,recoCellSize,dqmCellSize);
00982   int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0)    return false;
00983   int semid=sem_get(FUShmBuffer::getSemKey(),9);   if (semid<0)    return false;
00984   shmAddr  =shm_attach(shmid);                     if (0==shmAddr) return false; 
00985 
00986   int att = 0;
00987   for(; att <10; att++)
00988     {
00989       if(shm_nattch(shmid)>1) {
00990         cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
00991              <<", failed attempt to release shared memory."<<endl;
00992 	::sleep(1);
00993       }
00994       else
00995         break;
00996     }
00997 
00998   if(att>=10) return false;
00999 
01000   if (segmentationMode) {
01001     FUShmBuffer* buffer=
01002       new (shmAddr) FUShmBuffer(segmentationMode,
01003                                 nRawCells,nRecoCells,nDqmCells,
01004                                 rawCellSize,recoCellSize,dqmCellSize);
01005     int cellid;
01006     for (unsigned int i=0;i<nRawCells;i++) {
01007       cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
01008       if ((shm_destroy(cellid)==-1)) return false;
01009     }
01010     for (unsigned int i=0;i<nRecoCells;i++) {
01011       cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
01012       if ((shm_destroy(cellid)==-1)) return false;
01013     }
01014     for (unsigned int i=0;i<nDqmCells;i++) {
01015       cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
01016       if ((shm_destroy(cellid)==-1)) return false;
01017     }
01018   }
01019   shmdt(shmAddr);
01020   if (sem_destroy(semid)==-1)  return false;
01021   if (shm_destroy(shmid)==-1)  return false;
01022   if (shm_destroy(shmidd)==-1) return false;
01023 
01024   return true;
01025 }
01026 
01027 
01028 //______________________________________________________________________________
01029 unsigned int FUShmBuffer::size(bool         segmentationMode,
01030                                unsigned int nRawCells,
01031                                unsigned int nRecoCells,
01032                                unsigned int nDqmCells,
01033                                unsigned int rawCellSize,
01034                                unsigned int recoCellSize,
01035                                unsigned int dqmCellSize)
01036 {
01037   unsigned int offset=
01038     sizeof(FUShmBuffer)+
01039     sizeof(unsigned int)*4*nRawCells+
01040     sizeof(evt::State_t)*nRawCells+
01041     sizeof(dqm::State_t)*nDqmCells;
01042   
01043   unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
01044   unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
01045   unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
01046   
01047   unsigned int realSize =
01048     (segmentationMode) ?
01049     offset
01050     +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
01051     :
01052     offset
01053     +rawCellTotalSize*nRawCells
01054     +recoCellTotalSize*nRecoCells
01055     +dqmCellTotalSize*nDqmCells;
01056 
01057   unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
01058   
01059   return result;
01060 }
01061 
01062 
01063 //______________________________________________________________________________
01064 key_t FUShmBuffer::getShmDescriptorKey()
01065 {
01066   key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
01067   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
01068                              <<"for file "<<shmKeyPath_<<"!"<<endl;
01069   return result;
01070 }
01071 
01072 
01073 //______________________________________________________________________________
01074 key_t FUShmBuffer::getShmKey()
01075 {
01076   key_t result=getuid()*1000+SHM_KEYID;
01077   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
01078                              <<"for file "<<shmKeyPath_<<"!"<<endl;
01079   return result;
01080 }
01081 
01082 
01083 //______________________________________________________________________________
01084 key_t FUShmBuffer::getSemKey()
01085 {
01086   key_t result=getuid()*1000+SEM_KEYID;
01087   if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
01088                              <<"for file "<<semKeyPath_<<"!"<<endl;
01089   return result;
01090 }
01091 
01092 
01093 //______________________________________________________________________________
01094 int FUShmBuffer::shm_create(key_t key,int size)
01095 {
01096   // first check and possibly remove existing segment with same id
01097   int shmid=shmget(key,1,0644);//using minimal size any segment with key "key" will be connected
01098   if(shmid!=-1){
01099     // an existing segment was found, remove it
01100     shmid_ds shmstat;
01101     int result=shmctl(shmid,IPC_STAT,&shmstat);
01102     cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01103          << " created by process " << shmstat.shm_cpid << " owned by "
01104          << shmstat.shm_perm.uid << " permissions " 
01105          << hex << shmstat.shm_perm.mode << dec << endl;
01106     result=shmctl(shmid,IPC_RMID,&shmstat);
01107   }
01108   shmid=shmget(key,size,IPC_CREAT|0644);
01109   if (shmid==-1) {
01110     int err=errno;
01111     cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01112         <<strerror(err)<<endl;
01113   }
01114   return shmid;
01115 }
01116 
01117 
01118 //______________________________________________________________________________
01119 int FUShmBuffer::shm_get(key_t key,int size)
01120 {
01121   int shmid=shmget(key,size,0644);
01122   if (shmid==-1) {
01123     int err=errno;
01124     cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01125         <<strerror(err)<<endl;
01126   }
01127   return shmid;
01128 }
01129 
01130 
01131 //______________________________________________________________________________
01132 void* FUShmBuffer::shm_attach(int shmid)
01133 {
01134   void* result=shmat(shmid,NULL,0);
01135   if (0==result) {
01136     int err=errno;
01137     cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01138         <<strerror(err)<<endl;
01139   }
01140   return result;
01141 }
01142 
01143 
01144 //______________________________________________________________________________
01145 int FUShmBuffer::shm_nattch(int shmid)
01146 {
01147   shmid_ds shmstat;
01148   shmctl(shmid,IPC_STAT,&shmstat);
01149   return shmstat.shm_nattch;
01150 }
01151 
01152 
01153 //______________________________________________________________________________
01154 int FUShmBuffer::shm_destroy(int shmid)
01155 {
01156   shmid_ds shmstat;
01157   int result=shmctl(shmid,IPC_RMID,&shmstat);
01158   if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01159   return result;
01160 }
01161 
01162 
01163 //______________________________________________________________________________
01164 int FUShmBuffer::sem_create(key_t key,int nsem)
01165 {
01166   int semid=semget(key,nsem,IPC_CREAT|0666);
01167   if (semid<0) {
01168     int err=errno;
01169     cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01170         <<strerror(err)<<endl;
01171   }
01172   return semid;
01173 }
01174 
01175 
01176 //______________________________________________________________________________
01177 int FUShmBuffer::sem_get(key_t key,int nsem)
01178 {
01179   int semid=semget(key,nsem,0666);
01180   if (semid<0) {
01181     int err=errno;
01182     cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01183         <<strerror(err)<<endl;
01184   }
01185   return semid;
01186 }
01187 
01188 
01189 //______________________________________________________________________________
01190 int FUShmBuffer::sem_destroy(int semid)
01191 {
01192   int result=semctl(semid,0,IPC_RMID);
01193   if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01194   return result;
01195 }
01196 
01197 
01198 
01200 // implementation of private member functions
01202 
01203 //______________________________________________________________________________
01204 unsigned int FUShmBuffer::nextIndex(unsigned int  offset,
01205                                     unsigned int  nCells,
01206                                     unsigned int& iNext)
01207 {
01208   lock();
01209   unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01210   pindex+=iNext;
01211   iNext=(iNext+1)%nCells;
01212   unsigned int result=*pindex;
01213   unlock();
01214   return result;
01215 }
01216 
01217 
01218 //______________________________________________________________________________
01219 void FUShmBuffer::postIndex(unsigned int  index,
01220                             unsigned int  offset,
01221                             unsigned int  nCells,
01222                             unsigned int& iLast)
01223 {
01224   lock();
01225   unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01226   pindex+=iLast;
01227   *pindex=index;
01228   iLast=(iLast+1)%nCells;
01229   unlock();
01230 }
01231 
01232 
01233 //______________________________________________________________________________
01234 unsigned int FUShmBuffer::nextRawWriteIndex()
01235 {
01236   return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01237 }
01238 
01239 
01240 //______________________________________________________________________________
01241 unsigned int FUShmBuffer::nextRawReadIndex()
01242 {
01243   return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01244 }
01245 
01246 
01247 //______________________________________________________________________________
01248 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01249 {
01250   postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01251 }
01252 
01253 
01254 //______________________________________________________________________________
01255 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01256 {
01257   postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01258 }
01259 
01260 
01261 //______________________________________________________________________________
01262 unsigned int FUShmBuffer::nextRecoWriteIndex()
01263 {
01264   return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01265 }
01266 
01267 
01268 //______________________________________________________________________________
01269 unsigned int FUShmBuffer::nextRecoReadIndex()
01270 {
01271   return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01272 }
01273 
01274 
01275 //______________________________________________________________________________
01276 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01277 {
01278   postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01279 }
01280 
01281 
01282 //______________________________________________________________________________
01283 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01284 {
01285   postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01286 }
01287 
01288 
01289 //______________________________________________________________________________
01290 unsigned int FUShmBuffer::nextDqmWriteIndex()
01291 {
01292   return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01293 }
01294 
01295 
01296 //______________________________________________________________________________
01297 unsigned int FUShmBuffer::nextDqmReadIndex()
01298 {
01299   return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01300 }
01301 
01302 
01303 //______________________________________________________________________________
01304 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01305 {
01306   postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01307 }
01308 
01309 
01310 //______________________________________________________________________________
01311 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01312 {
01313   postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01314 }
01315 
01316 
01317 //______________________________________________________________________________
01318 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01319 {
01320   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01321   for (unsigned int i=0;i<nRawCells_;i++) {
01322     if ((*pevt++)==evtNumber) return i;
01323   }
01324   assert(false);
01325   return 0xffffffff;
01326 }
01327 
01328 
01329 //______________________________________________________________________________
01330 evt::State_t FUShmBuffer::evtState(unsigned int index)
01331 {
01332   assert(index<nRawCells_);
01333   evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01334   pstate+=index;
01335   return *pstate;
01336 }
01337 
01338 
01339 //______________________________________________________________________________
01340 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01341 {
01342   assert(index<nDqmCells_);
01343   dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01344   pstate+=index;
01345   return *pstate;
01346 }
01347 
01348 
01349 //______________________________________________________________________________
01350 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01351 {
01352   assert(index<nRawCells_);
01353   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01354   pevt+=index;
01355   return *pevt;
01356 }
01357 
01358 
01359 //______________________________________________________________________________
01360 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01361 {
01362   assert(index<nRawCells_);
01363   pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01364   prcid+=index;
01365   return *prcid;
01366 }
01367 
01368 
01369 //______________________________________________________________________________
01370 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01371 {
01372   assert(index<nRawCells_);
01373   time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01374   ptstmp+=index;
01375   return *ptstmp;
01376 }
01377 
01378 
01379 //______________________________________________________________________________
01380 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01381 {
01382   assert(index<nClientsMax_);
01383   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01384   prcid+=index;
01385   return *prcid;
01386 }
01387 
01388 
01389 //______________________________________________________________________________
01390 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01391 {
01392   assert(index<nRawCells_);
01393   evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01394   pstate+=index;
01395   lock();
01396   *pstate=state;
01397   unlock();
01398   return true;
01399 }
01400 
01401 
01402 //______________________________________________________________________________
01403 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01404 {
01405   assert(index<nDqmCells_);
01406   dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01407   pstate+=index;
01408   lock();
01409   *pstate=state;
01410   unlock();
01411   return true;
01412 }
01413 
01414 
01415 //______________________________________________________________________________
01416 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01417 {
01418   assert(index<nRawCells_);
01419   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01420   pcount+=index;
01421   lock();
01422   *pcount=discard;
01423   unlock();
01424   return true;
01425 }
01426 
01427 
01428 //______________________________________________________________________________
01429 int FUShmBuffer::incEvtDiscard(unsigned int index)
01430 {
01431   int result = 0;
01432   assert(index<nRawCells_);
01433   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01434   pcount+=index;
01435   lock();
01436   (*pcount)++;
01437   result = *pcount;
01438   unlock();
01439   return result;
01440 }
01441 
01442 
01443 //______________________________________________________________________________
01444 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01445 {
01446   assert(index<nRawCells_);
01447   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01448   pevt+=index;
01449   lock();
01450   *pevt=evtNumber;
01451   unlock();
01452   return true;
01453 }
01454 
01455 
01456 //______________________________________________________________________________
01457 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01458 {
01459   assert(index<nRawCells_);
01460   pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01461   prcid+=index;
01462   lock();
01463   *prcid=prcId;
01464   unlock();
01465   return true;
01466 }
01467 
01468 
01469 //______________________________________________________________________________
01470 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01471 {
01472   assert(index<nRawCells_);
01473   time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01474   ptstmp+=index;
01475   lock();
01476   *ptstmp=timeStamp;
01477   unlock();
01478   return true;
01479 }
01480 
01481 
01482 //______________________________________________________________________________
01483 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01484 {
01485   lock();
01486   assert(nClients_<nClientsMax_);
01487   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01488   for (unsigned int i=0;i<nClients_;i++) {
01489     if ((*prcid)==prcId) { unlock();  return false; }
01490     prcid++;
01491   }
01492   nClients_++;
01493   *prcid=prcId;
01494   unlock();
01495   return true;
01496 }
01497 
01498 
01499 //______________________________________________________________________________
01500 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01501 {
01502   lock();
01503   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01504   unsigned int iClient(0);
01505   while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01506   assert(iClient!=nClients_);
01507   pid_t* next=prcid; next++;
01508   while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01509   nClients_--;
01510   unlock();
01511   return true;
01512 }
01513 
01514 
01515 //______________________________________________________________________________
01516 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01517 {
01518   FUShmRawCell* result(0);
01519   
01520   if (iCell>=nRawCells_) {
01521     cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01522         <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01523     return result;
01524   }
01525   
01526   if (segmentationMode_) {
01527     key_t         shmkey  =rawCellShmKey(iCell);
01528     int           shmid   =shm_get(shmkey,rawCellTotalSize_);
01529     void*         cellAddr=shm_attach(shmid);
01530     result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01531   }
01532   else {
01533     result=
01534       (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
01535   }
01536   
01537   return result;
01538 }
01539 
01540 
01541 //______________________________________________________________________________
01542 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01543 {
01544   FUShmRecoCell* result(0);
01545   
01546   if (iCell>=nRecoCells_) {
01547     cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01548         <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01549     return result;
01550   }
01551   
01552   if (segmentationMode_) {
01553     key_t         shmkey  =recoCellShmKey(iCell);
01554     int           shmid   =shm_get(shmkey,recoCellTotalSize_);
01555     void*         cellAddr=shm_attach(shmid);
01556     result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01557   }
01558   else {
01559     result=
01560       (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
01561   }
01562   
01563   return result;
01564 }
01565 
01566 
01567 //______________________________________________________________________________
01568 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01569 {
01570   FUShmDqmCell* result(0);
01571   
01572   if (iCell>=nDqmCells_) {
01573     cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01574         <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01575     return result;
01576   }
01577   
01578   if (segmentationMode_) {
01579     key_t         shmkey  =dqmCellShmKey(iCell);
01580     int           shmid   =shm_get(shmkey,dqmCellTotalSize_);
01581     void*         cellAddr=shm_attach(shmid);
01582     result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01583   }
01584   else {
01585     result=
01586       (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01587   }
01588   
01589   return result;
01590 }
01591 
01592 
01593 //______________________________________________________________________________
01594 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01595 {
01596   assert(index<nRawCells_);
01597   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01598   pcount+=index;
01599   lock();
01600   assert(*pcount>0);
01601   --(*pcount);
01602   bool result=(*pcount==0);
01603   unlock();
01604   return result;
01605 }
01606 
01607 
01608 //______________________________________________________________________________
01609 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01610 {
01611   if (!segmentationMode_) {
01612     cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01613     return -1;
01614   }
01615   key_t* addr=(key_t*)((unsigned long)this+offset);
01616   for (unsigned int i=0;i<iCell;i++) ++addr;
01617   return *addr;
01618 }
01619 
01620 
01621 //______________________________________________________________________________
01622 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01623 {
01624   if (iCell>=nRawCells_) {
01625     cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01626         <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01627     return -1;
01628   }
01629   return shmKey(iCell,rawCellOffset_);
01630 }
01631 
01632 
01633 //______________________________________________________________________________
01634 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01635 {
01636   if (iCell>=nRecoCells_) {
01637     cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01638         <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01639     return -1;
01640   }
01641   return shmKey(iCell,recoCellOffset_);
01642 }
01643 
01644 
01645 //______________________________________________________________________________
01646 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01647 {
01648   if (iCell>=nDqmCells_) {
01649     cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01650         <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01651     return -1;
01652   }
01653   return shmKey(iCell,dqmCellOffset_);
01654 }
01655 
01656 
01657 //______________________________________________________________________________
01658 void FUShmBuffer::sem_init(int isem,int value)
01659 {
01660   if (semctl(semid(),isem,SETVAL,value)<0) {
01661     cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01662   }
01663 }
01664 
01665 
01666 //______________________________________________________________________________
01667 int FUShmBuffer::sem_wait(int isem)
01668 {
01669   struct sembuf sops[1];
01670   sops[0].sem_num=isem;
01671   sops[0].sem_op =  -1;
01672   sops[0].sem_flg=   0;
01673   if (semop(semid(),sops,1)==-1) {
01674     cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01675     return -1;
01676   }
01677   return 0;
01678 }
01679 
01680 
01681 //______________________________________________________________________________
01682 void FUShmBuffer::sem_post(int isem)
01683 {
01684   struct sembuf sops[1];
01685   sops[0].sem_num=isem;
01686   sops[0].sem_op =   1;
01687   sops[0].sem_flg=   0;
01688   if (semop(semid(),sops,1)==-1) {
01689     cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01690   }
01691 }