30 #define SHM_DESCRIPTOR_KEYID 1
42 const char* FUShmBuffer::shmKeyPath_ =
43 (getenv(
"FUSHM_KEYFILE") ==
NULL ?
"/dev/null" : getenv(
"FUSHM_KEYFILE"));
44 const char* FUShmBuffer::semKeyPath_ =
45 (getenv(
"FUSEM_KEYFILE") ==
NULL ?
"/dev/null" : getenv(
"FUSEM_KEYFILE"));
53 FUShmBuffer::FUShmBuffer(
bool segmentationMode,
54 unsigned int nRawCells,
55 unsigned int nRecoCells,
56 unsigned int nDqmCells,
57 unsigned int rawCellSize,
58 unsigned int recoCellSize,
59 unsigned int dqmCellSize)
60 : segmentationMode_(segmentationMode)
62 , nRawCells_(nRawCells)
63 , rawCellPayloadSize_(rawCellSize)
64 , nRecoCells_(nRecoCells)
65 , recoCellPayloadSize_(recoCellSize)
66 , nDqmCells_(nDqmCells)
67 , dqmCellPayloadSize_(dqmCellSize)
246 unsigned int *iWrite,*iRead;
251 for (
unsigned int i=0;
i<
nRawCells_;
i++) { *iWrite++=
i; *iRead++ =0xffffffff; }
256 for (
unsigned int i=0;
i<
nRecoCells_;
i++) { *iWrite++=
i; *iRead++ =0xffffffff; }
261 for (
unsigned int i=0;
i<
nDqmCells_;
i++) { *iWrite++=
i; *iRead++ =0xffffffff; }
278 return semctl(
semid(),1,GETVAL);
285 return semctl(
semid(),2,GETVAL);
515 ))
std::cout <<
"=================releaseRawCell state " << state
639 unsigned int fuProcessId,
642 unsigned int dataSize)
645 cout<<
"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
652 cell->
writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
662 unsigned int evtNumber,
663 unsigned int outModId,
664 unsigned int fuProcessId,
667 unsigned int dataSize)
670 cout<<
"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
683 fuProcessId,fuGuid,data,dataSize);
694 unsigned int fuProcessId,
695 unsigned int iRawCell)
699 unsigned int dataSize=
sizeof(uint32_t)*(4+1024)+raw->
eventSize();
700 unsigned char *
data =
new unsigned char[dataSize];
701 uint32_t *
pos =(uint32_t*)data;
710 *pos++=(uint32_t)runNumber;
713 for (
unsigned int i=0;
i<1024;
i++) *pos++ = (uint32_t)raw->
fedSize(
i);
764 unsigned int evtAtUpdate,
765 unsigned int folderId,
766 unsigned int fuProcessId,
769 unsigned int dataSize)
772 cout<<
"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
782 cell->
writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
794 cout<<
"--> current sem values:"
796 <<
" lock="<<semctl(
semid(),0,GETVAL)
798 <<
" wraw="<<semctl(
semid(),1,GETVAL)
799 <<
" rraw="<<semctl(
semid(),2,GETVAL)
801 <<
" wdsc="<<semctl(
semid(),3,GETVAL)
802 <<
" rdsc="<<semctl(
semid(),4,GETVAL)
804 <<
" wrec="<<semctl(
semid(),5,GETVAL)
805 <<
" rrec="<<semctl(
semid(),6,GETVAL)
807 <<
" wdqm="<<semctl(
semid(),7,GETVAL)
808 <<
" rdqm="<<semctl(
semid(),8,GETVAL)
817 std::string stateName;
819 else if (state==
evt::STOP) stateName=
"STOP";
829 else if (state==
evt::SENT) stateName=
"SENT";
831 cout<<
"evt "<<index<<
" in state '"<<stateName<<
"'."<<endl;
839 cout<<
"dqm evt "<<index<<
" in state '"<<state<<
"'."<<endl;
845 unsigned int nRawCells,
846 unsigned int nRecoCells,
847 unsigned int nDqmCells,
848 unsigned int rawCellSize,
849 unsigned int recoCellSize,
850 unsigned int dqmCellSize)
854 cout<<
"FUShmBuffer::createShmBuffer: "
855 <<
"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
859 int size =
sizeof(
unsigned int)*7;
861 void*shmAddr=
shm_attach(shmid);
if(0==shmAddr)
return 0;
864 cout<<
"FUShmBuffer::createShmBuffer() FAILED: nattch="<<
shm_nattch(shmid)<<endl;
869 unsigned int*
p=(
unsigned int*)shmAddr;
870 *p++=segmentationMode;
881 nRawCells,nRecoCells,nDqmCells,
882 rawCellSize,recoCellSize,dqmCellSize);
885 shmAddr =
shm_attach(shmid);
if (0==shmAddr)
return 0;
888 cout<<
"FUShmBuffer::createShmBuffer FAILED: nattch="<<
shm_nattch(shmid)<<endl;
893 nRawCells,nRecoCells,nDqmCells,
894 rawCellSize,recoCellSize,dqmCellSize);
896 cout<<
"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
897 cout<<
" segmentationMode="<<segmentationMode<<endl;
909 int size =
sizeof(
unsigned int)*7;
911 void* shmAddr=
shm_attach(shmid);
if (0==shmAddr)
return 0;
913 unsigned int *
p=(
unsigned int*)shmAddr;
914 bool segmentationMode=*p++;
918 unsigned int rawCellSize =*p++;
919 unsigned int recoCellSize =*p++;
920 unsigned int dqmCellSize =*p++;
923 cout<<
"FUShmBuffer::getShmBuffer():"
924 <<
" segmentationMode="<<segmentationMode
925 <<
" nRawCells="<<nRawCells
926 <<
" nRecoCells="<<nRecoCells
927 <<
" nDqmCells="<<nDqmCells
928 <<
" rawCellSize="<<rawCellSize
929 <<
" recoCellSize="<<recoCellSize
930 <<
" dqmCellSize="<<dqmCellSize
935 nRawCells,nRecoCells,nDqmCells,
936 rawCellSize,recoCellSize,dqmCellSize);
939 shmAddr =
shm_attach(shmid);
if (0==shmAddr)
return 0;
942 cout<<
"FUShmBuffer::getShmBuffer() FAILED: nattch="<<
shm_nattch(shmid)<<endl;
947 nRawCells,nRecoCells,nDqmCells,
948 rawCellSize,recoCellSize,dqmCellSize);
950 cout<<
"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
951 cout<<
" segmentationMode="<<segmentationMode<<endl;
963 int size =
sizeof(
unsigned int)*7;
965 void* shmAddr=
shm_attach(shmidd);
if (0==shmAddr)
return false;
967 unsigned int*
p=(
unsigned int*)shmAddr;
968 bool segmentationMode=*p++;
972 unsigned int rawCellSize =*p++;
973 unsigned int recoCellSize =*p++;
974 unsigned int dqmCellSize =*p++;
980 nRawCells,nRecoCells,nDqmCells,
981 rawCellSize,recoCellSize,dqmCellSize);
984 shmAddr =
shm_attach(shmid);
if (0==shmAddr)
return false;
987 for(; att <10; att++)
990 cout << att <<
" FUShmBuffer::releaseSharedMemory(): nattch="<<
shm_nattch(shmid)
991 <<
", failed attempt to release shared memory."<<endl;
998 if(att>=10)
return false;
1000 if (segmentationMode) {
1003 nRawCells,nRecoCells,nDqmCells,
1004 rawCellSize,recoCellSize,dqmCellSize);
1030 unsigned int nRawCells,
1031 unsigned int nRecoCells,
1032 unsigned int nDqmCells,
1033 unsigned int rawCellSize,
1034 unsigned int recoCellSize,
1035 unsigned int dqmCellSize)
1039 sizeof(
unsigned int)*4*nRawCells+
1047 unsigned int realSize =
1048 (segmentationMode) ?
1050 +
sizeof(key_t)*(nRawCells+nRecoCells+
nDqmCells)
1053 +rawCellTotalSize*nRawCells
1054 +recoCellTotalSize*nRecoCells
1055 +dqmCellTotalSize*nDqmCells;
1057 unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
1067 if (result==(key_t)-1)
cout<<
"FUShmBuffer::getShmDescriptorKey: failed "
1077 if (result==(key_t)-1)
cout<<
"FUShmBuffer::getShmKey: ftok() failed "
1087 if (result==(key_t)-1)
cout<<
"FUShmBuffer::getSemKey: ftok() failed "
1097 int shmid=shmget(key,1,0644);
1101 int result=shmctl(shmid,IPC_STAT,&shmstat);
1102 cout <<
"FUShmBuffer found segment for key 0x " << hex << key << dec
1103 <<
" created by process " << shmstat.shm_cpid <<
" owned by "
1104 << shmstat.shm_perm.uid <<
" permissions "
1105 << hex << shmstat.shm_perm.mode << dec << endl;
1106 result=shmctl(shmid,IPC_RMID,&shmstat);
1108 shmid=shmget(key,size,IPC_CREAT|0644);
1111 cout<<
"FUShmBuffer::shm_create("<<key<<
","<<size<<
") failed: "
1112 <<strerror(err)<<endl;
1121 int shmid=shmget(key,size,0644);
1124 cout<<
"FUShmBuffer::shm_get("<<key<<
","<<size<<
") failed: "
1125 <<strerror(err)<<endl;
1137 cout<<
"FUShmBuffer::shm_attach("<<shmid<<
") failed: "
1138 <<strerror(err)<<endl;
1148 shmctl(shmid,IPC_STAT,&shmstat);
1149 return shmstat.shm_nattch;
1157 int result=shmctl(shmid,IPC_RMID,&shmstat);
1158 if (result==-1)
cout<<
"FUShmBuffer::shm_destroy(shmid="<<shmid<<
") failed."<<endl;
1166 int semid=semget(key,nsem,IPC_CREAT|0666);
1169 cout<<
"FUShmBuffer::sem_create(key="<<key<<
",nsem="<<nsem<<
") failed: "
1170 <<strerror(err)<<endl;
1179 int semid=semget(key,nsem,0666);
1182 cout<<
"FUShmBuffer::sem_get(key="<<key<<
",nsem="<<nsem<<
") failed: "
1183 <<strerror(err)<<endl;
1192 int result=semctl(semid,0,IPC_RMID);
1193 if (result==-1)
cout<<
"FUShmBuffer::sem_destroy(semid="<<semid<<
") failed."<<endl;
1205 unsigned int nCells,
1206 unsigned int& iNext)
1209 unsigned int* pindex=(
unsigned int*)((
unsigned long)
this+
offset);
1211 iNext=(iNext+1)%nCells;
1212 unsigned int result=*pindex;
1221 unsigned int nCells,
1222 unsigned int& iLast)
1225 unsigned int* pindex=(
unsigned int*)((
unsigned long)
this+
offset);
1228 iLast=(iLast+1)%nCells;
1489 if ((*prcid)==prcId) {
unlock();
return false; }
1504 unsigned int iClient(0);
1505 while (iClient<=
nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
1507 pid_t* next=prcid; next++;
1508 while (iClient<
nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
1521 cout<<
"FUShmBuffer::rawCell("<<iCell<<
") ERROR: "
1522 <<
"iCell="<<iCell<<
" >= nRawCells()="<<
nRawCells_<<endl;
1547 cout<<
"FUShmBuffer::recoCell("<<iCell<<
") ERROR: "
1548 <<
"iCell="<<iCell<<
" >= nRecoCells="<<
nRecoCells_<<endl;
1573 cout<<
"FUShmBuffer::dqmCell("<<iCell<<
") ERROR: "
1574 <<
"iCell="<<iCell<<
" >= nDqmCells="<<
nDqmCells_<<endl;
1602 bool result=(*pcount==0);
1612 cout<<
"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
1615 key_t* addr=(key_t*)((
unsigned long)
this+
offset);
1616 for (
unsigned int i=0;
i<iCell;
i++) ++addr;
1625 cout<<
"FUShmBuffer::rawCellShmKey() ERROR: "
1626 <<
"iCell>=nRawCells: "<<iCell<<
">="<<
nRawCells_<<endl;
1637 cout<<
"FUShmBuffer::recoCellShmKey() ERROR: "
1638 <<
"iCell>=nRecoCells: "<<iCell<<
">="<<
nRecoCells_<<endl;
1649 cout<<
"FUShmBuffer::dqmCellShmKey() ERROR: "
1650 <<
"iCell>=nDqmCells: "<<iCell<<
">="<<
nDqmCells_<<endl;
1660 if (semctl(
semid(),isem,SETVAL,value)<0) {
1661 cout<<
"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
1669 struct sembuf sops[1];
1670 sops[0].sem_num=isem;
1671 sops[0].sem_op = -1;
1673 if (semop(
semid(),sops,1)==-1) {
1674 cout<<
"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
1684 struct sembuf sops[1];
1685 sops[0].sem_num=isem;
1688 if (semop(
semid(),sops,1)==-1) {
1689 cout<<
"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
void discardRecoCell(unsigned int iCell)
unsigned int clientPrcIdOffset_
key_t recoCellShmKey(unsigned int iCell)
FUShmRawCell * rawCellToRead()
FUShmRawCell * rawCellToDiscard()
int incEvtDiscard(unsigned int index)
void postRawIndexToRead(unsigned int index)
unsigned int nRecoCells() const
void writeRawLumiSectionEvent(unsigned int)
static int shm_nattch(int shmid)
unsigned int index() const
void initialize(unsigned int index)
bool setEvtPrcId(unsigned int index, pid_t prcId)
unsigned int dqmWriteNext_
static void * shm_attach(int shmid)
void postDqmIndexToWrite(unsigned int index)
unsigned int rawReadOffset_
void initialize(unsigned int index)
static unsigned int size(unsigned int payloadSize)
unsigned int nextRecoReadIndex()
unsigned int dqmReadLast_
unsigned int recoCellPayloadSize_
unsigned int dqmReadOffset_
bool writeRecoEventData(unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
unsigned int nextIndex(unsigned int offset, unsigned int nCells, unsigned int &iNext)
bool writeDqmEventData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
void scheduleRawCellForDiscardServerSide(unsigned int iCell)
unsigned int dqmWriteLast_
static FUShmBuffer * getShmBuffer()
int nbRawCellsToRead() const
void printDqmState(unsigned int index)
void releaseRawCell(FUShmRawCell *cell)
unsigned int evtDiscardOffset_
bool writeRecoInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
static const char * semKeyPath_
unsigned int dqmStateOffset_
pid_t clientPrcId(unsigned int index)
static key_t getShmDescriptorKey()
static int shm_get(key_t key, int size)
unsigned int dqmWriteOffset_
unsigned int recoWriteLast_
static bool releaseSharedMemory()
static int sem_create(key_t key, int nsem)
bool removeClientPrcId(pid_t prcId)
static unsigned int size(bool segmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize, unsigned int recoCellSize, unsigned int dqmCellSize)
FUShmDqmCell * dqmCellToRead()
unsigned int evtNumberOffset_
unsigned int dqmCellOffset_
evt::State_t evtState(unsigned int index)
void discardDqmCell(unsigned int iCell)
unsigned int evtNumber() const
unsigned int nextDqmReadIndex()
time_t evtTimeStamp(unsigned int index)
unsigned int evtNumber(unsigned int index)
unsigned int recoWriteNext_
void initialize(unsigned int shmid, unsigned int semid)
void writeRecoEmptyEvent()
bool setEvtNumber(unsigned int index, unsigned int evtNumber)
void postDqmIndexToRead(unsigned int index)
unsigned int evtStateOffset_
static unsigned int size(unsigned int payloadSize)
unsigned int rawCellIndex() const
void initialize(unsigned int index)
void discardOrphanedRecoCell(unsigned int iCell)
bool setClientPrcId(pid_t prcId)
unsigned int getlbn(const unsigned char *)
void discardRawCell(FUShmRawCell *cell)
void postRawIndexToWrite(unsigned int index)
unsigned int nextRawReadIndex()
unsigned char * payloadAddr() const
FUShmRawCell * rawCell(unsigned int iCell)
unsigned int recoReadOffset_
bool setEvtState(unsigned int index, evt::State_t state)
static const char * shmKeyPath_
unsigned int rawWriteNext_
pid_t evtPrcId(unsigned int index)
void finishWritingRawCell(FUShmRawCell *cell)
key_t rawCellShmKey(unsigned int iCell)
static int shm_destroy(int shmid)
unsigned int nRawCells() const
FUShmRecoCell * recoCell(unsigned int iCell)
void writeRawEmptyEvent()
bool rawCellReadyForDiscard(unsigned int index)
unsigned int offset(bool)
unsigned char * fedAddr(unsigned int i) const
unsigned int dqmCellPayloadSize_
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
void finishReadingDqmCell(FUShmDqmCell *cell)
void scheduleRawEmptyCellForDiscard()
unsigned int recoReadLast_
void finishReadingRecoCell(FUShmRecoCell *cell)
unsigned int nextDqmWriteIndex()
void writeErrorEvent(unsigned int rawCellIndex, unsigned int runNumber, unsigned int evtNumber, unsigned int fuProcessId, unsigned char *data, unsigned int dataSize)
key_t dqmCellShmKey(unsigned int iCell)
dqm::State_t dqmState(unsigned int index)
unsigned int rawReadNext_
unsigned int rawCellPayloadSize_
unsigned int nextRecoWriteIndex()
unsigned int eventSize() const
unsigned int rawDiscardIndex_
unsigned int nextRawWriteIndex()
FUShmRawCell * rawCellToWrite()
unsigned int rawReadLast_
unsigned int rawWriteOffset_
unsigned int dqmCellTotalSize_
bool setEvtTimeStamp(unsigned int index, time_t timeStamp)
unsigned int rawWriteLast_
void writeDqmEmptyEvent()
unsigned int dqmReadNext_
unsigned int recoCellTotalSize_
void postRecoIndexToRead(unsigned int index)
void writeInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
unsigned int index() const
void setLumiSection(unsigned int)
unsigned int indexForEvtNumber(unsigned int evtNumber)
void printEvtState(unsigned int index)
unsigned int nClientsMax_
static int sem_get(key_t key, int nsem)
FUShmRecoCell * recoCellToRead()
int nbRawCellsToWrite() const
unsigned int rawCellTotalSize_
FUShmBuffer(bool segmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize, unsigned int recoCellSize, unsigned int dqmCellSize)
void writeData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
static int sem_destroy(int semid)
#define SHM_DESCRIPTOR_KEYID
static int shm_create(key_t key, int size)
unsigned int nDqmCells() const
unsigned int fedSize(unsigned int i) const
void sem_init(int isem, int value)
unsigned int evtPrcIdOffset_
unsigned int recoWriteOffset_
void writeEventData(unsigned int rawCellIndex, unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
static unsigned int size(unsigned int payloadSize)
unsigned int rawCellOffset_
unsigned int recoReadNext_
bool writeErrorEventData(unsigned int runNumber, unsigned int fuProcessId, unsigned int iRawCell)
bool setEvtDiscard(unsigned int index, unsigned int discard)
key_t shmKey(unsigned int iCell, unsigned int offset)
void finishReadingRawCell(FUShmRawCell *cell)
tuple size
Write out results.
unsigned int recoCellOffset_
std::string timeStamp(TimePoint_t)
void postRecoIndexToWrite(unsigned int index)
bool setDqmState(unsigned int index, dqm::State_t state)
void postIndex(unsigned int index, unsigned int offset, unsigned int nCells, unsigned int &iLast)
FUShmDqmCell * dqmCell(unsigned int iCell)
void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell *cell)
unsigned int evtTimeStampOffset_
void scheduleRawCellForDiscard(unsigned int iCell)