00001 #ifndef FUSHMBUFFER_H
00002 #define FUSHMBUFFER_H 1
00003
00004
00005 #include "EventFilter/ShmBuffer/interface/FUShmRawCell.h"
00006 #include "EventFilter/ShmBuffer/interface/FUShmRecoCell.h"
00007 #include "EventFilter/ShmBuffer/interface/FUShmDqmCell.h"
00008
00009
00010 #include <sys/ipc.h>
00011 #include <sys/types.h>
00012 #include <sys/shm.h>
00013 #include <sys/sem.h>
00014 #include <errno.h>
00015
00016
00017 namespace evf {
00018
00019
00020 namespace evt {
00021 enum State_t { EMPTY, STOP,
00022 RAWWRITING, RAWWRITTEN,
00023 RAWREADING, RAWREAD,
00024 PROCESSING, PROCESSED,
00025 RECOWRITING,RECOWRITTEN,
00026 SENDING, SENT,
00027 LUMISECTION, USEDLS,
00028 DISCARDING };
00029 }
00030
00031
00032 namespace dqm {
00033 enum State_t { EMPTY,
00034 WRITING, WRITTEN,
00035 SENDING, SENT,
00036 DISCARDING };
00037 }
00038
00039
00040 class FUShmBuffer
00041 {
00042
00043
00044
00045 private:
00046 FUShmBuffer(bool segmentationMode,
00047 unsigned int nRawCells,
00048 unsigned int nRecoCells,
00049 unsigned int nDqmCells,
00050 unsigned int rawCellSize,
00051 unsigned int recoCellSize,
00052 unsigned int dqmCellSize);
00053 public:
00054 ~FUShmBuffer();
00055
00056
00057 public:
00058
00059
00060
00061 void initialize(unsigned int shmid,unsigned int semid);
00062 void reset();
00063
00064 unsigned int nRawCells() const { return nRawCells_; }
00065 unsigned int nRecoCells() const { return nRecoCells_; }
00066 unsigned int nDqmCells() const { return nDqmCells_; }
00067
00068 int shmid() const { return shmid_; }
00069 int semid() const { return semid_; }
00070 unsigned int nClients() const { return nClients_; }
00071
00072 evt::State_t evtState(unsigned int index);
00073 dqm::State_t dqmState(unsigned int index);
00074
00075 unsigned int evtNumber(unsigned int index);
00076 pid_t evtPrcId(unsigned int index);
00077 time_t evtTimeStamp(unsigned int index);
00078 pid_t clientPrcId(unsigned int index);
00079
00080 int nbRawCellsToWrite() const;
00081 int nbRawCellsToRead() const;
00082
00083 FUShmRawCell* rawCellToWrite();
00084 FUShmRawCell* rawCellToRead();
00085 FUShmRecoCell* recoCellToRead();
00086 FUShmDqmCell* dqmCellToRead();
00087 FUShmRawCell* rawCellToDiscard();
00088
00089 void finishWritingRawCell(FUShmRawCell* cell);
00090 void finishReadingRawCell(FUShmRawCell* cell);
00091 void finishReadingRecoCell(FUShmRecoCell* cell);
00092 void finishReadingDqmCell(FUShmDqmCell* cell);
00093
00094 void scheduleRawCellForDiscard(unsigned int iCell);
00095 void scheduleRawCellForDiscardServerSide(unsigned int iCell);
00096
00097 void discardRawCell(FUShmRawCell* cell);
00098 void discardRecoCell(unsigned int iCell);
00099 void discardOrphanedRecoCell(unsigned int iCell);
00100 void discardDqmCell(unsigned int iCell);
00101
00102 void releaseRawCell(FUShmRawCell* cell);
00103
00104 void writeRawEmptyEvent();
00105 void writeRawLumiSectionEvent(unsigned int);
00106 void writeRecoEmptyEvent();
00107 void writeDqmEmptyEvent();
00108
00109 void scheduleRawEmptyCellForDiscard();
00110 void scheduleRawEmptyCellForDiscard(FUShmRawCell* cell);
00111 void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell);
00112
00113 bool writeRecoInitMsg(unsigned int outModId,
00114 unsigned int fuProcessId,
00115 unsigned int fuGuid,
00116 unsigned char *data,
00117 unsigned int dataSize);
00118
00119 bool writeRecoEventData(unsigned int runNumber,
00120 unsigned int evtNumber,
00121 unsigned int outModId,
00122 unsigned int fuProcessId,
00123 unsigned int fuGuid,
00124 unsigned char *data,
00125 unsigned int dataSize);
00126
00127 bool writeErrorEventData(unsigned int runNumber,
00128 unsigned int fuProcessId,
00129 unsigned int iRawCell);
00130
00131 bool writeDqmEventData(unsigned int runNumber,
00132 unsigned int evtAtUpdate,
00133 unsigned int folderId,
00134 unsigned int fuProcessId,
00135 unsigned int fuGuid,
00136 unsigned char *data,
00137 unsigned int dataSize);
00138
00139 void sem_print();
00140 void printEvtState(unsigned int index);
00141 void printDqmState(unsigned int index);
00142
00143
00144
00145
00146
00147 static FUShmBuffer* createShmBuffer(bool semgmentationMode,
00148 unsigned int nRawCells,
00149 unsigned int nRecoCells,
00150 unsigned int nDqmCells,
00151 unsigned int rawCellSize =0x400000,
00152 unsigned int recoCellSize=0x400000,
00153 unsigned int dqmCellSize =0x400000);
00154
00155 static FUShmBuffer* getShmBuffer();
00156
00157 static bool releaseSharedMemory();
00158
00159 static unsigned int size(bool segmentationMode,
00160 unsigned int nRawCells,
00161 unsigned int nRecoCells,
00162 unsigned int nDqmCells,
00163 unsigned int rawCellSize,
00164 unsigned int recoCellSize,
00165 unsigned int dqmCellSize);
00166
00167
00168 static key_t getShmDescriptorKey();
00169 static key_t getShmKey();
00170 static key_t getSemKey();
00171
00172 static int shm_create(key_t key,int size);
00173 static int shm_get(key_t key,int size);
00174 static void* shm_attach(int shmid);
00175 static int shm_nattch(int shmid);
00176 static int shm_destroy(int shmid);
00177
00178 static int sem_create(key_t key,int nsem);
00179 static int sem_get(key_t key,int nsem);
00180 static int sem_destroy(int semid);
00181
00182
00183 private:
00184
00185
00186
00187 unsigned int nextIndex(unsigned int offset,
00188 unsigned int nCells,
00189 unsigned int& iNext);
00190 void postIndex(unsigned int index,
00191 unsigned int offset,
00192 unsigned int nCells,
00193 unsigned int& iLast);
00194
00195 unsigned int nextRawWriteIndex();
00196 unsigned int nextRawReadIndex();
00197 void postRawIndexToWrite(unsigned int index);
00198 void postRawIndexToRead(unsigned int index);
00199
00200 unsigned int nextRecoWriteIndex();
00201 unsigned int nextRecoReadIndex();
00202 void postRecoIndexToWrite(unsigned int index);
00203 void postRecoIndexToRead(unsigned int index);
00204
00205 unsigned int nextDqmWriteIndex();
00206 unsigned int nextDqmReadIndex();
00207 void postDqmIndexToWrite(unsigned int index);
00208 void postDqmIndexToRead(unsigned int index);
00209
00210 unsigned int indexForEvtNumber(unsigned int evtNumber);
00211
00212 public:
00213 bool setEvtState(unsigned int index,evt::State_t state);
00214 bool setDqmState(unsigned int index,dqm::State_t state);
00215 private:
00216 bool setEvtDiscard(unsigned int index,unsigned int discard);
00217 int incEvtDiscard(unsigned int index);
00218 bool setEvtNumber(unsigned int index,unsigned int evtNumber);
00219 bool setEvtPrcId(unsigned int index,pid_t prcId);
00220 bool setEvtTimeStamp(unsigned int index,time_t timeStamp);
00221
00222 bool setClientPrcId(pid_t prcId);
00223 public:
00224 bool removeClientPrcId(pid_t prcId);
00225
00226 FUShmRawCell* rawCell(unsigned int iCell);
00227 FUShmRecoCell* recoCell(unsigned int iCell);
00228 FUShmDqmCell* dqmCell(unsigned int iCell);
00229
00230 private:
00231
00232 bool rawCellReadyForDiscard(unsigned int index);
00233
00234 key_t shmKey(unsigned int iCell,unsigned int offset);
00235 key_t rawCellShmKey(unsigned int iCell);
00236 key_t recoCellShmKey(unsigned int iCell);
00237 key_t dqmCellShmKey(unsigned int iCell);
00238
00239 void sem_init(int isem,int value);
00240 int sem_wait(int isem);
00241 void sem_post(int isem);
00242
00243 public:
00244 void lock() { sem_wait(0); }
00245 void unlock() { sem_post(0); }
00246 private:
00247 void waitRawWrite() { sem_wait(1); }
00248 void postRawWrite() { sem_post(1); }
00249 void waitRawRead() { sem_wait(2); }
00250 void postRawRead() { sem_post(2); }
00251 void waitRawDiscard() { sem_wait(3); }
00252 void postRawDiscard() { sem_post(3); }
00253 void waitRawDiscarded() { sem_wait(4); }
00254 void postRawDiscarded() { sem_post(4); }
00255 void waitRecoWrite() { sem_wait(5); }
00256 void postRecoWrite() { sem_post(5); }
00257 void waitRecoRead() { sem_wait(6); }
00258 void postRecoRead() { sem_post(6); }
00259 void waitDqmWrite() { sem_wait(7); }
00260 void postDqmWrite() { sem_post(7); }
00261 void waitDqmRead() { sem_wait(8); }
00262 void postDqmRead() { sem_post(8); }
00263
00264
00265 private:
00266
00267
00268
00269 bool segmentationMode_;
00270 int shmid_;
00271 int semid_;
00272
00273 unsigned int rawWriteNext_;
00274 unsigned int rawWriteLast_;
00275 unsigned int rawWriteOffset_;
00276 unsigned int rawReadNext_;
00277 unsigned int rawReadLast_;
00278 unsigned int rawReadOffset_;
00279 unsigned int rawDiscardIndex_;
00280
00281 unsigned int recoWriteNext_;
00282 unsigned int recoWriteLast_;
00283 unsigned int recoWriteOffset_;
00284 unsigned int recoReadNext_;
00285 unsigned int recoReadLast_;
00286 unsigned int recoReadOffset_;
00287
00288 unsigned int dqmWriteNext_;
00289 unsigned int dqmWriteLast_;
00290 unsigned int dqmWriteOffset_;
00291 unsigned int dqmReadNext_;
00292 unsigned int dqmReadLast_;
00293 unsigned int dqmReadOffset_;
00294
00295 unsigned int evtStateOffset_;
00296 unsigned int evtDiscardOffset_;
00297 unsigned int evtNumberOffset_;
00298 unsigned int evtPrcIdOffset_;
00299 unsigned int evtTimeStampOffset_;
00300 unsigned int dqmStateOffset_;
00301
00302 unsigned int nClients_;
00303 unsigned int nClientsMax_;
00304 unsigned int clientPrcIdOffset_;
00305
00306 unsigned int nRawCells_;
00307 unsigned int rawCellPayloadSize_;
00308 unsigned int rawCellTotalSize_;
00309 unsigned int rawCellOffset_;
00310
00311 unsigned int recoWriteIndex_;
00312 unsigned int recoReadIndex_;
00313 unsigned int nRecoCells_;
00314 unsigned int recoCellPayloadSize_;
00315 unsigned int recoCellTotalSize_;
00316 unsigned int recoCellOffset_;
00317
00318 unsigned int dqmWriteIndex_;
00319 unsigned int dqmReadIndex_;
00320 unsigned int nDqmCells_;
00321 unsigned int dqmCellPayloadSize_;
00322 unsigned int dqmCellTotalSize_;
00323 unsigned int dqmCellOffset_;
00324
00325 static const char* shmKeyPath_;
00326 static const char* semKeyPath_;
00327
00328 };
00329
00330
00331 }
00332
00333
00334 #endif