CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/ShmBuffer/src/FUShmBuffer.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUShmBuffer
00004 // -----------
00005 //
00006 //            15/11/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013 
00014 #include <unistd.h>
00015 #include <iostream>
00016 #include <string>
00017 #include <cassert>
00018 
00019 #include <sstream>
00020 #include <fstream>
00021 
00022 #include <cstdlib>
00023 #include <cstring>
00024 #include <stdint.h>
00025 
00026 // the shmem keys are henceforth going to be FIXED for a give userid
00027 // prior to creation, the application will attempt to get ownership
00028 // of existing segments by the same key and destroy them
00029 
00030 #define SHM_DESCRIPTOR_KEYID           1 /* Id used on ftok for 1. shmget key */
00031 #define SHM_KEYID                      2 /* Id used on ftok for 2. shmget key */
00032 #define SEM_KEYID                      1 /* Id used on ftok for semget key    */
00033 
00034 #define NSKIP_MAX                    100
00035 
00036 
00037 using namespace std;
00038 using namespace evf;
00039 
00040 
00041 //obsolete!!!
00042 const char* FUShmBuffer::shmKeyPath_ =
00043   (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045   (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00046 
00047 
00049 // construction/destruction
00051 
00052 //______________________________________________________________________________
00053 FUShmBuffer::FUShmBuffer(bool         segmentationMode,
00054                          unsigned int nRawCells,
00055                          unsigned int nRecoCells,
00056                          unsigned int nDqmCells,
00057                          unsigned int rawCellSize,
00058                          unsigned int recoCellSize,
00059                          unsigned int dqmCellSize)
00060   : segmentationMode_(segmentationMode)
00061   , nClientsMax_(128)
00062   , nRawCells_(nRawCells)
00063   , rawCellPayloadSize_(rawCellSize)
00064   , nRecoCells_(nRecoCells)
00065   , recoCellPayloadSize_(recoCellSize)
00066   , nDqmCells_(nDqmCells)
00067   , dqmCellPayloadSize_(dqmCellSize)
00068 {
00069   rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00070   recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00071   dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00072 
00073   void* addr;
00074   
00075   rawWriteOffset_=sizeof(FUShmBuffer);
00076   addr=(void*)((unsigned long)this+rawWriteOffset_);
00077   new (addr) unsigned int[nRawCells_];
00078   
00079   rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00080   addr=(void*)((unsigned long)this+rawReadOffset_);
00081   new (addr) unsigned int[nRawCells_];
00082  
00083   recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00084   addr=(void*)((unsigned long)this+recoWriteOffset_);
00085   new (addr) unsigned int[nRecoCells_];
00086 
00087   recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00088   addr=(void*)((unsigned long)this+recoReadOffset_);
00089   new (addr) unsigned int[nRecoCells_];
00090 
00091   dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00092   addr=(void*)((unsigned long)this+dqmWriteOffset_);
00093   new (addr) unsigned int[nDqmCells_];
00094 
00095   dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00096   addr=(void*)((unsigned long)this+dqmReadOffset_);
00097   new (addr) unsigned int[nDqmCells_];
00098 
00099   evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00100   addr=(void*)((unsigned long)this+evtStateOffset_);
00101   new (addr) evt::State_t[nRawCells_];
00102   
00103   evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00104   addr=(void*)((unsigned long)this+evtDiscardOffset_);
00105   new (addr) unsigned int[nRawCells_];
00106   
00107   evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00108   addr=(void*)((unsigned long)this+evtNumberOffset_);
00109   new (addr) unsigned int[nRawCells_];
00110   
00111   evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00112   addr=(void*)((unsigned long)this+evtPrcIdOffset_);
00113   new (addr) pid_t[nRawCells_];
00114   
00115   evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00116   addr=(void*)((unsigned long)this+evtTimeStampOffset_);
00117   new (addr) time_t[nRawCells_];
00118   
00119   dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00120   addr=(void*)((unsigned long)this+dqmStateOffset_);
00121   new (addr) dqm::State_t[nDqmCells_];
00122   
00123   clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00124   addr=(void*)((unsigned long)this+clientPrcIdOffset_);
00125   new (addr) pid_t[nClientsMax_];
00126 
00127   rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00128   
00129   if (segmentationMode_) {
00130     recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00131     dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00132     addr=(void*)((unsigned long)this+rawCellOffset_);
00133     new (addr) key_t[nRawCells_];
00134     addr=(void*)((unsigned long)this+recoCellOffset_);
00135     new (addr) key_t[nRecoCells_];
00136     addr=(void*)((unsigned long)this+dqmCellOffset_);
00137     new (addr) key_t[nDqmCells_];
00138   }
00139   else {
00140     recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00141     dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00142     for (unsigned int i=0;i<nRawCells_;i++) {
00143       addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
00144       new (addr) FUShmRawCell(rawCellSize);
00145     }
00146     for (unsigned int i=0;i<nRecoCells_;i++) {
00147       addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
00148       new (addr) FUShmRecoCell(recoCellSize);
00149     }
00150     for (unsigned int i=0;i<nDqmCells_;i++) {
00151       addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
00152       new (addr) FUShmDqmCell(dqmCellSize);
00153     }
00154   }
00155 }
00156 
00157 
00158 //______________________________________________________________________________
00159 FUShmBuffer::~FUShmBuffer()
00160 {
00161   
00162 }
00163 
00164 
00166 // implementation of member functions
00168 
00169 //______________________________________________________________________________
00170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00171 {
00172   shmid_=shmid;
00173   semid_=semid;
00174   
00175   if (segmentationMode_) {
00176     int    shmKeyId=666;
00177     key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
00178     for (unsigned int i=0;i<nRawCells_;i++) {
00179       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00180       int   shmid  =shm_create(*keyAddr,rawCellTotalSize_);
00181       void* shmAddr=shm_attach(shmid);
00182       new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00183       shmdt(shmAddr);
00184       ++keyAddr;
00185     }
00186     keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
00187     for (unsigned int i=0;i<nRecoCells_;i++) {
00188       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00189       int   shmid  =shm_create(*keyAddr,recoCellTotalSize_);
00190       void* shmAddr=shm_attach(shmid);
00191       new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00192       shmdt(shmAddr);
00193       ++keyAddr;
00194     }
00195     keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
00196     for (unsigned int i=0;i<nDqmCells_;i++) {
00197       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00198       int   shmid  =shm_create(*keyAddr,dqmCellTotalSize_);
00199       void* shmAddr=shm_attach(shmid);
00200       new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00201       shmdt(shmAddr);
00202       ++keyAddr;
00203     }
00204   }
00205   
00206   for (unsigned int i=0;i<nRawCells_;i++) {
00207     FUShmRawCell* cell=rawCell(i);
00208     cell->initialize(i);
00209     if (segmentationMode_) shmdt(cell);
00210   }
00211   
00212   for (unsigned int i=0;i<nRecoCells_;i++) {
00213     FUShmRecoCell* cell=recoCell(i);
00214     cell->initialize(i);
00215     if (segmentationMode_) shmdt(cell);
00216   }
00217   
00218   for (unsigned int i=0;i<nDqmCells_;i++) {
00219     FUShmDqmCell* cell=dqmCell(i);
00220     cell->initialize(i);
00221     if (segmentationMode_) shmdt(cell);
00222   }
00223 
00224   reset();
00225 }
00226 
00227 
00228 //______________________________________________________________________________
00229 void FUShmBuffer::reset()
00230 {
00231   nClients_=0;
00232 
00233   // setup ipc semaphores
00234   sem_init(0,1);          // lock (binary)
00235   sem_init(1,nRawCells_); // raw  write semaphore
00236   sem_init(2,0);          // raw  read  semaphore
00237   sem_init(3,1);          // binary semaphore to schedule raw event for discard
00238   sem_init(4,0);          // binary semaphore to discard raw event
00239   sem_init(5,nRecoCells_);// reco write semaphore
00240   sem_init(6,0);          // reco send (read) semaphore
00241   sem_init(7,nDqmCells_); // dqm  write semaphore
00242   sem_init(8,0);          // dqm  send (read) semaphore
00243 
00244   sem_print();
00245 
00246   unsigned int *iWrite,*iRead;
00247   
00248   rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00249   iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
00250   iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
00251   for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00252 
00253   recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00254   iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
00255   iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
00256   for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00257 
00258   dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00259   iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
00260   iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
00261   for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00262   
00263   for (unsigned int i=0;i<nRawCells_;i++) {
00264     setEvtState(i,evt::EMPTY);
00265     setEvtDiscard(i,0);
00266     setEvtNumber(i,0xffffffff);
00267     setEvtPrcId(i,0);
00268     setEvtTimeStamp(i,0);
00269   }
00270 
00271   for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00272 }
00273 
00274 
00275 //______________________________________________________________________________
00276 unsigned int FUShmBuffer::nbRawCellsToWrite() const
00277 {
00278   return semctl(semid(),1,GETVAL);
00279 }
00280 
00281 
00282 //______________________________________________________________________________
00283 int FUShmBuffer::nbRawCellsToRead() const
00284 {
00285   return semctl(semid(),2,GETVAL);
00286 }
00287 
00288 
00289 //______________________________________________________________________________
00290 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00291 {
00292   if(waitRawWrite()!=0) return 0;
00293   unsigned int  iCell=nextRawWriteIndex();
00294   FUShmRawCell* cell =rawCell(iCell);
00295   evt::State_t  state=evtState(iCell);
00296   assert(state==evt::EMPTY);
00297   setEvtState(iCell,evt::RAWWRITING);
00298   setEvtDiscard(iCell,1);
00299   return cell;
00300 }
00301 
00302 
00303 //______________________________________________________________________________
00304 FUShmRawCell* FUShmBuffer::rawCellToRead()
00305 {
00306   waitRawRead();
00307   unsigned int iCell=nextRawReadIndex();
00308   FUShmRawCell* cell=rawCell(iCell);
00309   evt::State_t  state=evtState(iCell);
00310   assert(state==evt::RAWWRITTEN
00311          ||state==evt::EMPTY
00312          ||state==evt::STOP
00313          ||state==evt::LUMISECTION);
00314   if (state==evt::RAWWRITTEN) {
00315     setEvtPrcId(iCell,getpid());
00316     setEvtState(iCell,evt::RAWREADING);
00317   }
00318   return cell;
00319 }
00320 
00321 
00322 //______________________________________________________________________________
00323 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00324 {
00325   waitRecoRead();
00326   unsigned int   iCell   =nextRecoReadIndex();
00327   FUShmRecoCell* cell    =recoCell(iCell);
00328   unsigned int   iRawCell=cell->rawCellIndex();
00329   if (iRawCell<nRawCells_) {
00330     //evt::State_t   state=evtState(iRawCell);
00331     //assert(state==evt::RECOWRITTEN);
00332     setEvtState(iRawCell,evt::SENDING);
00333   }
00334   return cell;
00335 }
00336 
00337 
00338 //______________________________________________________________________________
00339 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00340 {
00341   waitDqmRead();
00342   unsigned int  iCell=nextDqmReadIndex();
00343   FUShmDqmCell* cell=dqmCell(iCell);
00344   dqm::State_t  state=dqmState(iCell);
00345   assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00346   if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00347   return cell;
00348 }
00349 
00350 
00351 //______________________________________________________________________________
00352 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00353 {
00354   waitRawDiscarded();
00355   FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00356   evt::State_t  state=evtState(cell->index());
00357   assert(state==evt::PROCESSED
00358          ||state==evt::SENT
00359          ||state==evt::EMPTY
00360          ||state==evt::STOP
00361          ||state==evt::USEDLS);
00362   if (state!=evt::EMPTY 
00363       && state!=evt::USEDLS
00364       && state!=evt::STOP
00365       ) 
00366     setEvtState(cell->index(),evt::DISCARDING);
00367   return cell;
00368 }
00369 
00370 
00371 //______________________________________________________________________________
00372 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00373 {
00374   evt::State_t state=evtState(cell->index());
00375   assert(state==evt::RAWWRITING);
00376   setEvtState(cell->index(),evt::RAWWRITTEN);
00377   setEvtNumber(cell->index(),cell->evtNumber());
00378   postRawIndexToRead(cell->index());
00379   if (segmentationMode_) shmdt(cell);
00380   postRawRead();
00381 }
00382 
00383 
00384 //______________________________________________________________________________
00385 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00386 {
00387   evt::State_t state=evtState(cell->index());
00388   assert(state==evt::RAWREADING);
00389   setEvtState(cell->index(),evt::RAWREAD);
00390   setEvtState(cell->index(),evt::PROCESSING);
00391   setEvtTimeStamp(cell->index(),time(0));
00392   if (segmentationMode_) shmdt(cell);
00393 }
00394 
00395 
00396 //______________________________________________________________________________
00397 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00398 {
00399   unsigned int iRawCell=cell->rawCellIndex();
00400   if (iRawCell<nRawCells_) {
00401     //evt::State_t state=evtState(cell->rawCellIndex());
00402     //assert(state==evt::SENDING);
00403     setEvtState(cell->rawCellIndex(),evt::SENT);
00404   }
00405   if (segmentationMode_) shmdt(cell);
00406 }
00407 
00408 
00409 //______________________________________________________________________________
00410 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00411 {
00412   dqm::State_t state=dqmState(cell->index());
00413   assert(state==dqm::SENDING||state==dqm::EMPTY);
00414   if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00415   if (segmentationMode_) shmdt(cell);
00416 }
00417 
00418 
00419 //______________________________________________________________________________
00420 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00421 {
00422   waitRawDiscard();
00423   if (rawCellReadyForDiscard(iCell)) {
00424     rawDiscardIndex_=iCell;
00425     evt::State_t  state=evtState(iCell);
00426     assert(state==evt::PROCESSING
00427            ||state==evt::SENT
00428            ||state==evt::EMPTY
00429            ||state==evt::STOP
00430            ||state==evt::LUMISECTION
00431            );
00432     if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00433     if (state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00434     postRawDiscarded();
00435   }
00436   else postRawDiscard();
00437 }
00438 
00439 
00440 //______________________________________________________________________________
00441 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell)
00442 {
00443   waitRawDiscard();
00444   if (rawCellReadyForDiscard(iCell)) {
00445     rawDiscardIndex_=iCell;
00446     evt::State_t  state=evtState(iCell);
00447     if(state!=evt::LUMISECTION && state!=evt::EMPTY && state!=evt::USEDLS) setEvtState(iCell,evt::PROCESSED);
00448     if(state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00449     postRawDiscarded();
00450   }
00451   else postRawDiscard();
00452 }
00453 
00454 
00455 //______________________________________________________________________________
00456 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00457 {
00458   releaseRawCell(cell);
00459   postRawDiscard();
00460 }
00461 
00462 
00463 //______________________________________________________________________________
00464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00465 {
00466   FUShmRecoCell* cell=recoCell(iCell);
00467   unsigned int iRawCell=cell->rawCellIndex();
00468   if (iRawCell<nRawCells_) {
00469     //evt::State_t state=evtState(iRawCell);
00470     //assert(state==evt::SENT);
00471     scheduleRawCellForDiscard(iRawCell);
00472   }
00473   cell->clear();
00474   if (segmentationMode_) shmdt(cell);
00475   postRecoIndexToWrite(iCell);
00476   postRecoWrite();
00477 }
00478 
00479 //______________________________________________________________________________
00480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
00481 {
00482   FUShmRecoCell* cell=recoCell(iCell);
00483   cell->clear();
00484   if (segmentationMode_) shmdt(cell);
00485   postRecoIndexToWrite(iCell);
00486   postRecoWrite();
00487 }
00488 
00489 
00490 //______________________________________________________________________________
00491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00492 {
00493   dqm::State_t state=dqmState(iCell);
00494   assert(state==dqm::EMPTY||state==dqm::SENT);
00495   setDqmState(iCell,dqm::DISCARDING);
00496   FUShmDqmCell* cell=dqmCell(iCell);
00497   cell->clear();
00498   if (segmentationMode_) shmdt(cell);
00499   setDqmState(iCell,dqm::EMPTY);
00500   postDqmIndexToWrite(iCell);
00501   postDqmWrite();
00502 }
00503 
00504 
00505 //______________________________________________________________________________
00506 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00507 {
00508   evt::State_t state=evtState(cell->index());
00509   if(!(
00510      state==evt::DISCARDING
00511      ||state==evt::RAWWRITING
00512      ||state==evt::EMPTY
00513      ||state==evt::STOP
00514      //     ||state==evt::LUMISECTION
00515      ||state==evt::USEDLS
00516      )) std::cout << "=================releaseRawCell state " << state 
00517                   << std::endl;
00518 
00519   assert(
00520          state==evt::DISCARDING
00521          ||state==evt::RAWWRITING
00522          ||state==evt::EMPTY
00523          ||state==evt::STOP
00524          //      ||state==evt::LUMISECTION
00525          ||state==evt::USEDLS
00526          );
00527   setEvtState(cell->index(),evt::EMPTY);
00528   setEvtDiscard(cell->index(),0);
00529   setEvtNumber(cell->index(),0xffffffff);
00530   setEvtPrcId(cell->index(),0);
00531   setEvtTimeStamp(cell->index(),0);
00532   cell->clear();
00533   postRawIndexToWrite(cell->index());
00534   if (segmentationMode_) shmdt(cell);
00535   postRawWrite();
00536 }
00537 
00538 
00539 //______________________________________________________________________________
00540 void FUShmBuffer::writeRawEmptyEvent()
00541 {
00542   FUShmRawCell* cell=rawCellToWrite();
00543   if(cell==0) return;
00544   evt::State_t state=evtState(cell->index());
00545   assert(state==evt::RAWWRITING);
00546   setEvtState(cell->index(),evt::STOP);
00547   cell->setEventTypeStopper();
00548   postRawIndexToRead(cell->index());
00549   if (segmentationMode_) shmdt(cell);
00550   postRawRead();
00551 }
00552 
00553 //______________________________________________________________________________
00554 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls)
00555 {
00556   FUShmRawCell* cell=rawCellToWrite();
00557   if(cell==0) return;
00558   cell->setLumiSection(ls);
00559   evt::State_t state=evtState(cell->index());
00560   assert(state==evt::RAWWRITING);
00561   setEvtState(cell->index(),evt::LUMISECTION);
00562   cell->setEventTypeEol();
00563   postRawIndexToRead(cell->index());
00564   if (segmentationMode_) shmdt(cell);
00565   postRawRead();
00566 }
00567 
00568 
00569 //______________________________________________________________________________
00570 void FUShmBuffer::writeRecoEmptyEvent()
00571 {
00572   waitRecoWrite();
00573   unsigned int   iCell=nextRecoWriteIndex();
00574   FUShmRecoCell* cell =recoCell(iCell);
00575   cell->clear();
00576   postRecoIndexToRead(iCell);
00577   if (segmentationMode_) shmdt(cell);
00578   postRecoRead();
00579 }
00580 
00581 
00582 //______________________________________________________________________________
00583 void FUShmBuffer::writeDqmEmptyEvent()
00584 {
00585   waitDqmWrite();
00586   unsigned int  iCell=nextDqmWriteIndex();
00587   FUShmDqmCell* cell=dqmCell(iCell);
00588   cell->clear();
00589   postDqmIndexToRead(iCell);
00590   if (segmentationMode_) shmdt(cell);
00591   postDqmRead();
00592 }
00593 
00594 
00595 //______________________________________________________________________________
00596 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00597 {
00598   FUShmRawCell* cell=rawCellToWrite();
00599   if(cell==0) return;
00600   rawDiscardIndex_=cell->index();
00601   setEvtState(cell->index(),evt::STOP);
00602   cell->setEventTypeStopper();
00603   setEvtNumber(cell->index(),0xffffffff);
00604   setEvtPrcId(cell->index(),0);
00605   setEvtTimeStamp(cell->index(),0);
00606   postRawDiscarded();
00607 }
00608 
00609 
00610 //______________________________________________________________________________
00611 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00612 {
00613   waitRawDiscard();
00614   if (rawCellReadyForDiscard(cell->index())) {
00615     rawDiscardIndex_=cell->index();
00616     // as this function is called by the reader, the state and type should 
00617     // already be correct
00618     //    setEvtState(cell->index(),evt::STOP);
00619     //    cell->setEventType(evt::STOPPER);
00620     //    setEvtNumber(cell->index(),0xffffffff);
00621     //    setEvtPrcId(cell->index(),0);
00622     //    setEvtTimeStamp(cell->index(),0);
00623     removeClientPrcId(getpid());
00624     if (segmentationMode_) shmdt(cell);
00625     postRawDiscarded();
00626   }
00627   else postRawDiscard();
00628 }
00629 
00630 //______________________________________________________________________________
00631 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell)
00632 {
00633   //  waitRawDiscard();
00634   if (rawCellReadyForDiscard(cell->index())) {
00635     rawDiscardIndex_=cell->index();
00636     //    setEvtState(cell->index(),evt::STOP);
00637     //    cell->setEventType(evt::STOPPER);
00638     //    setEvtNumber(cell->index(),0xffffffff);
00639     //    setEvtPrcId(cell->index(),0);
00640     //    setEvtTimeStamp(cell->index(),0);
00641     //    removeClientPrcId(getpid());
00642     if (segmentationMode_) shmdt(cell);
00643     postRawDiscarded();
00644   }
00645   else postRawDiscard();
00646 }
00647 
00648 
00649 //______________________________________________________________________________
00650 bool FUShmBuffer::writeRecoInitMsg(unsigned int   outModId,
00651                                    unsigned int   fuProcessId,
00652                                    unsigned int   fuGuid,
00653                                    unsigned char *data,
00654                                    unsigned int   dataSize)
00655 {
00656   if (dataSize>recoCellPayloadSize_) {
00657     cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00658     return false;
00659   }
00660   
00661   waitRecoWrite();
00662   unsigned int   iCell=nextRecoWriteIndex();
00663   FUShmRecoCell* cell =recoCell(iCell);
00664   cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00665   postRecoIndexToRead(iCell);
00666   if (segmentationMode_) shmdt(cell);
00667   postRecoRead();
00668   return true;
00669 }
00670 
00671 
00672 //______________________________________________________________________________
00673 bool FUShmBuffer::writeRecoEventData(unsigned int   runNumber,
00674                                      unsigned int   evtNumber,
00675                                      unsigned int   outModId,
00676                                      unsigned int   fuProcessId,
00677                                      unsigned int   fuGuid,
00678                                      unsigned char *data,
00679                                      unsigned int   dataSize)
00680 {
00681   if (dataSize>recoCellPayloadSize_) {
00682     cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00683     return false;
00684   }
00685   
00686   waitRecoWrite();
00687   unsigned int   iCell=nextRecoWriteIndex();
00688   FUShmRecoCell* cell =recoCell(iCell);
00689   unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00690   //evt::State_t state=evtState(rawCellIndex);
00691   //assert(state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT);
00692   setEvtState(rawCellIndex,evt::RECOWRITING);
00693   incEvtDiscard(rawCellIndex);
00694   cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00695                        fuProcessId,fuGuid,data,dataSize);
00696   setEvtState(rawCellIndex,evt::RECOWRITTEN);
00697   postRecoIndexToRead(iCell);
00698   if (segmentationMode_) shmdt(cell);
00699   postRecoRead();
00700   return true;
00701 }
00702 
00703 
00704 //______________________________________________________________________________
00705 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00706                                       unsigned int fuProcessId,
00707                                       unsigned int iRawCell)
00708 {
00709   FUShmRawCell *raw=rawCell(iRawCell);
00710 
00711   unsigned int   dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00712   unsigned char *data    =new unsigned char[dataSize];
00713   uint32_t      *pos     =(uint32_t*)data;
00714   // 06-Oct-2008, KAB - added a version number for the error event format.
00715   //
00716   // Version 1 had no version number, so the run number appeared in the
00717   // first value.  So, a reasonable test for version 1 is whether the
00718   // first value is larger than some relatively small cutoff (say 32).
00719   // Version 2 added the lumi block number.
00720   //
00721   *pos++=(uint32_t)2;  // protocol version number
00722   *pos++=(uint32_t)runNumber;
00723   *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00724   *pos++=(uint32_t)raw->evtNumber();
00725   for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00726   memcpy(pos,raw->payloadAddr(),raw->eventSize());
00727   
00728   // DEBUG
00729   /*
00730     if (1) {
00731     stringstream ss;
00732     ss<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".err";
00733     ofstream fout;
00734     fout.open(ss.str().c_str(),ios::out|ios::binary);
00735     if (!fout.write((char*)data,dataSize))
00736     cout<<"Failed to write error event to "<<ss.str()<<endl;
00737     fout.close();
00738     
00739     stringstream ss2;
00740     ss2<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".info";
00741     ofstream fout2;
00742     fout2.open(ss2.str().c_str());
00743     fout2<<"dataSize = "<<dataSize<<endl;
00744     fout2<<"runNumber = "<<runNumber<<endl;
00745     fout2<<"evtNumber = "<<raw->evtNumber()<<endl;
00746     fout2<<"eventSize = "<<raw->eventSize()<<endl;
00747     unsigned int totalSize(0);
00748     for (unsigned int i=0;i<1024;i++) {
00749     unsigned int fedSize = raw->fedSize(i);
00750     totalSize += fedSize;
00751     if (fedSize>0) fout2<<i<<": "<<fedSize<<endl;
00752     }
00753     fout2<<"totalSize = "<<totalSize<<endl;
00754     fout2.close();
00755     }
00756   */// END DEBUG
00757   
00758   waitRecoWrite();
00759   unsigned int   iRecoCell=nextRecoWriteIndex();
00760   FUShmRecoCell* reco     =recoCell(iRecoCell);
00761   setEvtState(iRawCell,evt::RECOWRITING);
00762   setEvtDiscard(iRawCell,1);
00763   reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00764                         data,dataSize);
00765   delete [] data;
00766   setEvtState(iRawCell,evt::RECOWRITTEN);
00767   postRecoIndexToRead(iRecoCell);
00768   if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00769   postRecoRead();
00770   return true;
00771 }
00772 
00773 
00774 //______________________________________________________________________________
00775 bool FUShmBuffer::writeDqmEventData(unsigned int   runNumber,
00776                                     unsigned int   evtAtUpdate,
00777                                     unsigned int   folderId,
00778                                     unsigned int   fuProcessId,
00779                                     unsigned int   fuGuid,
00780                                     unsigned char *data,
00781                                     unsigned int   dataSize)
00782 {
00783   if (dataSize>dqmCellPayloadSize_) {
00784     cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00785     return false;
00786   }
00787   
00788   waitDqmWrite();
00789   unsigned int  iCell=nextDqmWriteIndex();
00790   FUShmDqmCell* cell=dqmCell(iCell);
00791   dqm::State_t state=dqmState(iCell);
00792   assert(state==dqm::EMPTY);
00793   setDqmState(iCell,dqm::WRITING);
00794   cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00795   setDqmState(iCell,dqm::WRITTEN);
00796   postDqmIndexToRead(iCell);
00797   if (segmentationMode_) shmdt(cell);
00798   postDqmRead();
00799   return true;
00800 }
00801 
00802 
00803 //______________________________________________________________________________
00804 void FUShmBuffer::sem_print()
00805 {
00806   cout<<"--> current sem values:"
00807       <<endl
00808       <<" lock="<<semctl(semid(),0,GETVAL)
00809       <<endl
00810       <<" wraw="<<semctl(semid(),1,GETVAL)
00811       <<" rraw="<<semctl(semid(),2,GETVAL)
00812       <<endl
00813       <<" wdsc="<<semctl(semid(),3,GETVAL)
00814       <<" rdsc="<<semctl(semid(),4,GETVAL)
00815       <<endl
00816       <<" wrec="<<semctl(semid(),5,GETVAL)
00817       <<" rrec="<<semctl(semid(),6,GETVAL)
00818       <<endl
00819       <<" wdqm="<<semctl(semid(),7,GETVAL)
00820       <<" rdqm="<<semctl(semid(),8,GETVAL)
00821       <<endl;
00822 }
00823 
00824 
00825 //______________________________________________________________________________
00826 void FUShmBuffer::printEvtState(unsigned int index)
00827 {
00828   evt::State_t state=evtState(index);
00829   std::string stateName;
00830   if      (state==evt::EMPTY)      stateName="EMPTY";
00831   else if (state==evt::STOP)       stateName="STOP";
00832   else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00833   else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00834   else if (state==evt::RAWREADING) stateName="RAWREADING";
00835   else if (state==evt::RAWREAD)    stateName="RAWREAD";
00836   else if (state==evt::PROCESSING) stateName="PROCESSING";
00837   else if (state==evt::PROCESSED)  stateName="PROCESSED";
00838   else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00839   else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00840   else if (state==evt::SENDING)    stateName="SENDING";
00841   else if (state==evt::SENT)       stateName="SENT";
00842   else if (state==evt::DISCARDING) stateName="DISCARDING";
00843   cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00844 }
00845 
00846 
00847 //______________________________________________________________________________
00848 void FUShmBuffer::printDqmState(unsigned int index)
00849 {
00850   dqm::State_t state=dqmState(index);
00851   cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00852 }
00853 
00854 
00855 //______________________________________________________________________________
00856 FUShmBuffer* FUShmBuffer::createShmBuffer(bool         segmentationMode,
00857                                           unsigned int nRawCells,
00858                                           unsigned int nRecoCells,
00859                                           unsigned int nDqmCells,
00860                                           unsigned int rawCellSize,
00861                                           unsigned int recoCellSize,
00862                                           unsigned int dqmCellSize)
00863 {
00864   // if necessary, release shared memory first!
00865   if (FUShmBuffer::releaseSharedMemory())
00866     cout<<"FUShmBuffer::createShmBuffer: "
00867         <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00868         <<endl;
00869   
00870   // create bookkeeping shared memory segment
00871   int  size =sizeof(unsigned int)*7;
00872   int  shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00873   void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00874   
00875   if(1!=shm_nattch(shmid)) {
00876     cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00877     shmdt(shmAddr);
00878     return 0;
00879   }
00880   
00881   unsigned int* p=(unsigned int*)shmAddr;
00882   *p++=segmentationMode;
00883   *p++=nRawCells;
00884   *p++=nRecoCells;
00885   *p++=nDqmCells;
00886   *p++=rawCellSize;
00887   *p++=recoCellSize;
00888   *p++=dqmCellSize;
00889   shmdt(shmAddr);
00890   
00891   // create the 'real' shared memory buffer
00892   size     =FUShmBuffer::size(segmentationMode,
00893                               nRawCells,nRecoCells,nDqmCells,
00894                               rawCellSize,recoCellSize,dqmCellSize);
00895   shmid    =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00896   int semid=sem_create(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00897   shmAddr  =shm_attach(shmid);                         if (0==shmAddr) return 0;
00898   
00899   if (1!=shm_nattch(shmid)) {
00900     cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00901     shmdt(shmAddr);
00902     return 0;
00903   }
00904   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00905                                                nRawCells,nRecoCells,nDqmCells,
00906                                                rawCellSize,recoCellSize,dqmCellSize);
00907   
00908   cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00909   cout<<"                                segmentationMode="<<segmentationMode<<endl;
00910   
00911   buffer->initialize(shmid,semid);
00912   
00913   return buffer;
00914 }
00915 
00916 
00917 //______________________________________________________________________________
00918 FUShmBuffer* FUShmBuffer::getShmBuffer()
00919 {
00920   // get bookkeeping shared memory segment
00921   int   size   =sizeof(unsigned int)*7;
00922   int   shmid  =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00923   void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00924   
00925   unsigned int *p=(unsigned int*)shmAddr;
00926   bool          segmentationMode=*p++;
00927   unsigned int  nRawCells       =*p++;
00928   unsigned int  nRecoCells      =*p++;
00929   unsigned int  nDqmCells       =*p++;
00930   unsigned int  rawCellSize     =*p++;
00931   unsigned int  recoCellSize    =*p++;
00932   unsigned int  dqmCellSize     =*p++;
00933   shmdt(shmAddr);
00934 
00935   cout<<"FUShmBuffer::getShmBuffer():"
00936       <<" segmentationMode="<<segmentationMode
00937       <<" nRawCells="<<nRawCells
00938       <<" nRecoCells="<<nRecoCells
00939       <<" nDqmCells="<<nDqmCells
00940       <<" rawCellSize="<<rawCellSize
00941       <<" recoCellSize="<<recoCellSize
00942       <<" dqmCellSize="<<dqmCellSize
00943       <<endl;
00944   
00945   // get the 'real' shared memory buffer
00946   size     =FUShmBuffer::size(segmentationMode,
00947                               nRawCells,nRecoCells,nDqmCells,
00948                               rawCellSize,recoCellSize,dqmCellSize);
00949   shmid    =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00950   int semid=sem_get(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00951   shmAddr  =shm_attach(shmid);                      if (0==shmAddr) return 0;
00952   
00953   if (0==shm_nattch(shmid)) {
00954     cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00955     return 0;
00956   }
00957   
00958   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00959                                                nRawCells,nRecoCells,nDqmCells,
00960                                                rawCellSize,recoCellSize,dqmCellSize);
00961   
00962   cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00963   cout<<"                             segmentationMode="<<segmentationMode<<endl;
00964   
00965   buffer->setClientPrcId(getpid());
00966 
00967   return buffer;
00968 }
00969 
00970 
00971 //______________________________________________________________________________
00972 bool FUShmBuffer::releaseSharedMemory()
00973 {
00974   // get bookkeeping shared memory segment
00975   int   size   =sizeof(unsigned int)*7;
00976   int   shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00977   void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00978 
00979   unsigned int*p=(unsigned int*)shmAddr;
00980   bool         segmentationMode=*p++;
00981   unsigned int nRawCells       =*p++;
00982   unsigned int nRecoCells      =*p++;
00983   unsigned int nDqmCells       =*p++;
00984   unsigned int rawCellSize     =*p++;
00985   unsigned int recoCellSize    =*p++;
00986   unsigned int dqmCellSize     =*p++;
00987   shmdt(shmAddr);
00988 
00989   
00990   // get the 'real' shared memory segment
00991   size     =FUShmBuffer::size(segmentationMode,
00992                               nRawCells,nRecoCells,nDqmCells,
00993                               rawCellSize,recoCellSize,dqmCellSize);
00994   int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0)    return false;
00995   int semid=sem_get(FUShmBuffer::getSemKey(),9);   if (semid<0)    return false;
00996   shmAddr  =shm_attach(shmid);                     if (0==shmAddr) return false; 
00997 
00998   int att = 0;
00999   for(; att <10; att++)
01000     {
01001       if(shm_nattch(shmid)>1) {
01002         cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
01003              <<", failed attempt to release shared memory."<<endl;
01004 	::sleep(1);
01005       }
01006       else
01007         break;
01008     }
01009 
01010   if(att>=10) return false;
01011 
01012   if (segmentationMode) {
01013     FUShmBuffer* buffer=
01014       new (shmAddr) FUShmBuffer(segmentationMode,
01015                                 nRawCells,nRecoCells,nDqmCells,
01016                                 rawCellSize,recoCellSize,dqmCellSize);
01017     int cellid;
01018     for (unsigned int i=0;i<nRawCells;i++) {
01019       cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
01020       if ((shm_destroy(cellid)==-1)) return false;
01021     }
01022     for (unsigned int i=0;i<nRecoCells;i++) {
01023       cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
01024       if ((shm_destroy(cellid)==-1)) return false;
01025     }
01026     for (unsigned int i=0;i<nDqmCells;i++) {
01027       cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
01028       if ((shm_destroy(cellid)==-1)) return false;
01029     }
01030   }
01031   shmdt(shmAddr);
01032   if (sem_destroy(semid)==-1)  return false;
01033   if (shm_destroy(shmid)==-1)  return false;
01034   if (shm_destroy(shmidd)==-1) return false;
01035 
01036   return true;
01037 }
01038 
01039 
01040 //______________________________________________________________________________
01041 unsigned int FUShmBuffer::size(bool         segmentationMode,
01042                                unsigned int nRawCells,
01043                                unsigned int nRecoCells,
01044                                unsigned int nDqmCells,
01045                                unsigned int rawCellSize,
01046                                unsigned int recoCellSize,
01047                                unsigned int dqmCellSize)
01048 {
01049   unsigned int offset=
01050     sizeof(FUShmBuffer)+
01051     sizeof(unsigned int)*4*nRawCells+
01052     sizeof(evt::State_t)*nRawCells+
01053     sizeof(dqm::State_t)*nDqmCells;
01054   
01055   unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
01056   unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
01057   unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
01058   
01059   unsigned int realSize =
01060     (segmentationMode) ?
01061     offset
01062     +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
01063     :
01064     offset
01065     +rawCellTotalSize*nRawCells
01066     +recoCellTotalSize*nRecoCells
01067     +dqmCellTotalSize*nDqmCells;
01068 
01069   unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
01070   
01071   return result;
01072 }
01073 
01074 
01075 //______________________________________________________________________________
01076 key_t FUShmBuffer::getShmDescriptorKey()
01077 {
01078   key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
01079   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
01080                              <<"for file "<<shmKeyPath_<<"!"<<endl;
01081   return result;
01082 }
01083 
01084 
01085 //______________________________________________________________________________
01086 key_t FUShmBuffer::getShmKey()
01087 {
01088   key_t result=getuid()*1000+SHM_KEYID;
01089   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
01090                              <<"for file "<<shmKeyPath_<<"!"<<endl;
01091   return result;
01092 }
01093 
01094 
01095 //______________________________________________________________________________
01096 key_t FUShmBuffer::getSemKey()
01097 {
01098   key_t result=getuid()*1000+SEM_KEYID;
01099   if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
01100                              <<"for file "<<semKeyPath_<<"!"<<endl;
01101   return result;
01102 }
01103 
01104 
01105 //______________________________________________________________________________
01106 int FUShmBuffer::shm_create(key_t key,int size)
01107 {
01108   // first check and possibly remove existing segment with same id
01109   int shmid=shmget(key,1,0644);//using minimal size any segment with key "key" will be connected
01110   if(shmid!=-1){
01111     // an existing segment was found, remove it
01112     shmid_ds shmstat;
01113     int result=shmctl(shmid,IPC_STAT,&shmstat);
01114     cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01115          << " created by process " << shmstat.shm_cpid << " owned by "
01116          << shmstat.shm_perm.uid << " permissions " 
01117          << hex << shmstat.shm_perm.mode << dec << endl;
01118     result=shmctl(shmid,IPC_RMID,&shmstat);
01119   }
01120   shmid=shmget(key,size,IPC_CREAT|0644);
01121   if (shmid==-1) {
01122     int err=errno;
01123     cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01124         <<strerror(err)<<endl;
01125   }
01126   return shmid;
01127 }
01128 
01129 
01130 //______________________________________________________________________________
01131 int FUShmBuffer::shm_get(key_t key,int size)
01132 {
01133   int shmid=shmget(key,size,0644);
01134   if (shmid==-1) {
01135     int err=errno;
01136     cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01137         <<strerror(err)<<endl;
01138   }
01139   return shmid;
01140 }
01141 
01142 
01143 //______________________________________________________________________________
01144 void* FUShmBuffer::shm_attach(int shmid)
01145 {
01146   void* result=shmat(shmid,NULL,0);
01147   if (0==result) {
01148     int err=errno;
01149     cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01150         <<strerror(err)<<endl;
01151   }
01152   return result;
01153 }
01154 
01155 
01156 //______________________________________________________________________________
01157 int FUShmBuffer::shm_nattch(int shmid)
01158 {
01159   shmid_ds shmstat;
01160   shmctl(shmid,IPC_STAT,&shmstat);
01161   return shmstat.shm_nattch;
01162 }
01163 
01164 
01165 //______________________________________________________________________________
01166 int FUShmBuffer::shm_destroy(int shmid)
01167 {
01168   shmid_ds shmstat;
01169   int result=shmctl(shmid,IPC_RMID,&shmstat);
01170   if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01171   return result;
01172 }
01173 
01174 
01175 //______________________________________________________________________________
01176 int FUShmBuffer::sem_create(key_t key,int nsem)
01177 {
01178   int semid=semget(key,nsem,IPC_CREAT|0666);
01179   if (semid<0) {
01180     int err=errno;
01181     cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01182         <<strerror(err)<<endl;
01183   }
01184   return semid;
01185 }
01186 
01187 
01188 //______________________________________________________________________________
01189 int FUShmBuffer::sem_get(key_t key,int nsem)
01190 {
01191   int semid=semget(key,nsem,0666);
01192   if (semid<0) {
01193     int err=errno;
01194     cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01195         <<strerror(err)<<endl;
01196   }
01197   return semid;
01198 }
01199 
01200 
01201 //______________________________________________________________________________
01202 int FUShmBuffer::sem_destroy(int semid)
01203 {
01204   int result=semctl(semid,0,IPC_RMID);
01205   if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01206   return result;
01207 }
01208 
01209 
01210 
01212 // implementation of private member functions
01214 
01215 //______________________________________________________________________________
01216 unsigned int FUShmBuffer::nextIndex(unsigned int  offset,
01217                                     unsigned int  nCells,
01218                                     unsigned int& iNext)
01219 {
01220   lock();
01221   unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01222   pindex+=iNext;
01223   iNext=(iNext+1)%nCells;
01224   unsigned int result=*pindex;
01225   unlock();
01226   return result;
01227 }
01228 
01229 
01230 //______________________________________________________________________________
01231 void FUShmBuffer::postIndex(unsigned int  index,
01232                             unsigned int  offset,
01233                             unsigned int  nCells,
01234                             unsigned int& iLast)
01235 {
01236   lock();
01237   unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01238   pindex+=iLast;
01239   *pindex=index;
01240   iLast=(iLast+1)%nCells;
01241   unlock();
01242 }
01243 
01244 
01245 //______________________________________________________________________________
01246 unsigned int FUShmBuffer::nextRawWriteIndex()
01247 {
01248   return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01249 }
01250 
01251 
01252 //______________________________________________________________________________
01253 unsigned int FUShmBuffer::nextRawReadIndex()
01254 {
01255   return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01256 }
01257 
01258 
01259 //______________________________________________________________________________
01260 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01261 {
01262   postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01263 }
01264 
01265 
01266 //______________________________________________________________________________
01267 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01268 {
01269   postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01270 }
01271 
01272 
01273 //______________________________________________________________________________
01274 unsigned int FUShmBuffer::nextRecoWriteIndex()
01275 {
01276   return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01277 }
01278 
01279 
01280 //______________________________________________________________________________
01281 unsigned int FUShmBuffer::nextRecoReadIndex()
01282 {
01283   return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01284 }
01285 
01286 
01287 //______________________________________________________________________________
01288 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01289 {
01290   postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01291 }
01292 
01293 
01294 //______________________________________________________________________________
01295 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01296 {
01297   postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01298 }
01299 
01300 
01301 //______________________________________________________________________________
01302 unsigned int FUShmBuffer::nextDqmWriteIndex()
01303 {
01304   return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01305 }
01306 
01307 
01308 //______________________________________________________________________________
01309 unsigned int FUShmBuffer::nextDqmReadIndex()
01310 {
01311   return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01312 }
01313 
01314 
01315 //______________________________________________________________________________
01316 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01317 {
01318   postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01319 }
01320 
01321 
01322 //______________________________________________________________________________
01323 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01324 {
01325   postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01326 }
01327 
01328 
01329 //______________________________________________________________________________
01330 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01331 {
01332   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01333   for (unsigned int i=0;i<nRawCells_;i++) {
01334     if ((*pevt++)==evtNumber) return i;
01335   }
01336   assert(false);
01337   return 0xffffffff;
01338 }
01339 
01340 
01341 //______________________________________________________________________________
01342 evt::State_t FUShmBuffer::evtState(unsigned int index)
01343 {
01344   assert(index<nRawCells_);
01345   evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01346   pstate+=index;
01347   return *pstate;
01348 }
01349 
01350 
01351 //______________________________________________________________________________
01352 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01353 {
01354   assert(index<nDqmCells_);
01355   dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01356   pstate+=index;
01357   return *pstate;
01358 }
01359 
01360 
01361 //______________________________________________________________________________
01362 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01363 {
01364   assert(index<nRawCells_);
01365   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01366   pevt+=index;
01367   return *pevt;
01368 }
01369 
01370 
01371 //______________________________________________________________________________
01372 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01373 {
01374   assert(index<nRawCells_);
01375   pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01376   prcid+=index;
01377   return *prcid;
01378 }
01379 
01380 
01381 //______________________________________________________________________________
01382 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01383 {
01384   assert(index<nRawCells_);
01385   time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01386   ptstmp+=index;
01387   return *ptstmp;
01388 }
01389 
01390 
01391 //______________________________________________________________________________
01392 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01393 {
01394   assert(index<nClientsMax_);
01395   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01396   prcid+=index;
01397   return *prcid;
01398 }
01399 
01400 
01401 //______________________________________________________________________________
01402 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01403 {
01404   assert(index<nRawCells_);
01405   evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01406   pstate+=index;
01407   lock();
01408   *pstate=state;
01409   unlock();
01410   return true;
01411 }
01412 
01413 
01414 //______________________________________________________________________________
01415 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01416 {
01417   assert(index<nDqmCells_);
01418   dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01419   pstate+=index;
01420   lock();
01421   *pstate=state;
01422   unlock();
01423   return true;
01424 }
01425 
01426 
01427 //______________________________________________________________________________
01428 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01429 {
01430   assert(index<nRawCells_);
01431   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01432   pcount+=index;
01433   lock();
01434   *pcount=discard;
01435   unlock();
01436   return true;
01437 }
01438 
01439 
01440 //______________________________________________________________________________
01441 int FUShmBuffer::incEvtDiscard(unsigned int index)
01442 {
01443   int result = 0;
01444   assert(index<nRawCells_);
01445   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01446   pcount+=index;
01447   lock();
01448   (*pcount)++;
01449   result = *pcount;
01450   unlock();
01451   return result;
01452 }
01453 
01454 
01455 //______________________________________________________________________________
01456 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01457 {
01458   assert(index<nRawCells_);
01459   unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01460   pevt+=index;
01461   lock();
01462   *pevt=evtNumber;
01463   unlock();
01464   return true;
01465 }
01466 
01467 
01468 //______________________________________________________________________________
01469 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01470 {
01471   assert(index<nRawCells_);
01472   pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01473   prcid+=index;
01474   lock();
01475   *prcid=prcId;
01476   unlock();
01477   return true;
01478 }
01479 
01480 
01481 //______________________________________________________________________________
01482 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01483 {
01484   assert(index<nRawCells_);
01485   time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01486   ptstmp+=index;
01487   lock();
01488   *ptstmp=timeStamp;
01489   unlock();
01490   return true;
01491 }
01492 
01493 
01494 //______________________________________________________________________________
01495 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01496 {
01497   lock();
01498   assert(nClients_<nClientsMax_);
01499   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01500   for (unsigned int i=0;i<nClients_;i++) {
01501     if ((*prcid)==prcId) { unlock();  return false; }
01502     prcid++;
01503   }
01504   nClients_++;
01505   *prcid=prcId;
01506   unlock();
01507   return true;
01508 }
01509 
01510 
01511 //______________________________________________________________________________
01512 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01513 {
01514   lock();
01515   pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01516   unsigned int iClient(0);
01517   while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01518   assert(iClient!=nClients_);
01519   pid_t* next=prcid; next++;
01520   while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01521   nClients_--;
01522   unlock();
01523   return true;
01524 }
01525 
01526 
01527 //______________________________________________________________________________
01528 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01529 {
01530   FUShmRawCell* result(0);
01531   
01532   if (iCell>=nRawCells_) {
01533     cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01534         <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01535     return result;
01536   }
01537   
01538   if (segmentationMode_) {
01539     key_t         shmkey  =rawCellShmKey(iCell);
01540     int           shmid   =shm_get(shmkey,rawCellTotalSize_);
01541     void*         cellAddr=shm_attach(shmid);
01542     result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01543   }
01544   else {
01545     result=
01546       (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
01547   }
01548   
01549   return result;
01550 }
01551 
01552 
01553 //______________________________________________________________________________
01554 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01555 {
01556   FUShmRecoCell* result(0);
01557   
01558   if (iCell>=nRecoCells_) {
01559     cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01560         <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01561     return result;
01562   }
01563   
01564   if (segmentationMode_) {
01565     key_t         shmkey  =recoCellShmKey(iCell);
01566     int           shmid   =shm_get(shmkey,recoCellTotalSize_);
01567     void*         cellAddr=shm_attach(shmid);
01568     result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01569   }
01570   else {
01571     result=
01572       (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
01573   }
01574   
01575   return result;
01576 }
01577 
01578 
01579 //______________________________________________________________________________
01580 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01581 {
01582   FUShmDqmCell* result(0);
01583   
01584   if (iCell>=nDqmCells_) {
01585     cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01586         <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01587     return result;
01588   }
01589   
01590   if (segmentationMode_) {
01591     key_t         shmkey  =dqmCellShmKey(iCell);
01592     int           shmid   =shm_get(shmkey,dqmCellTotalSize_);
01593     void*         cellAddr=shm_attach(shmid);
01594     result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01595   }
01596   else {
01597     result=
01598       (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01599   }
01600   
01601   return result;
01602 }
01603 
01604 
01605 //______________________________________________________________________________
01606 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01607 {
01608   assert(index<nRawCells_);
01609   unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01610   pcount+=index;
01611   lock();
01612   assert(*pcount>0);
01613   --(*pcount);
01614   bool result=(*pcount==0);
01615   unlock();
01616   return result;
01617 }
01618 
01619 
01620 //______________________________________________________________________________
01621 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01622 {
01623   if (!segmentationMode_) {
01624     cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01625     return -1;
01626   }
01627   key_t* addr=(key_t*)((unsigned long)this+offset);
01628   for (unsigned int i=0;i<iCell;i++) ++addr;
01629   return *addr;
01630 }
01631 
01632 
01633 //______________________________________________________________________________
01634 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01635 {
01636   if (iCell>=nRawCells_) {
01637     cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01638         <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01639     return -1;
01640   }
01641   return shmKey(iCell,rawCellOffset_);
01642 }
01643 
01644 
01645 //______________________________________________________________________________
01646 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01647 {
01648   if (iCell>=nRecoCells_) {
01649     cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01650         <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01651     return -1;
01652   }
01653   return shmKey(iCell,recoCellOffset_);
01654 }
01655 
01656 
01657 //______________________________________________________________________________
01658 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01659 {
01660   if (iCell>=nDqmCells_) {
01661     cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01662         <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01663     return -1;
01664   }
01665   return shmKey(iCell,dqmCellOffset_);
01666 }
01667 
01668 
01669 //______________________________________________________________________________
01670 void FUShmBuffer::sem_init(int isem,int value)
01671 {
01672   if (semctl(semid(),isem,SETVAL,value)<0) {
01673     cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01674   }
01675 }
01676 
01677 
01678 //______________________________________________________________________________
01679 int FUShmBuffer::sem_wait(int isem)
01680 {
01681   struct sembuf sops[1];
01682   sops[0].sem_num=isem;
01683   sops[0].sem_op =  -1;
01684   sops[0].sem_flg=   0;
01685   if (semop(semid(),sops,1)==-1) {
01686     cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01687     return -1;
01688   }
01689   return 0;
01690 }
01691 
01692 
01693 //______________________________________________________________________________
01694 void FUShmBuffer::sem_post(int isem)
01695 {
01696   struct sembuf sops[1];
01697   sops[0].sem_num=isem;
01698   sops[0].sem_op =   1;
01699   sops[0].sem_flg=   0;
01700   if (semop(semid(),sops,1)==-1) {
01701     cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01702   }
01703 }