00001 #ifndef FUSHMBUFFER_H
00002 #define FUSHMBUFFER_H 1
00003
00004
00005 #include "EventFilter/ShmBuffer/interface/FUShmRawCell.h"
00006 #include "EventFilter/ShmBuffer/interface/FUShmRecoCell.h"
00007 #include "EventFilter/ShmBuffer/interface/FUShmDqmCell.h"
00008
00009
00010 #include <sys/ipc.h>
00011 #include <sys/types.h>
00012 #include <sys/shm.h>
00013 #include <sys/sem.h>
00014 #include <errno.h>
00015
00016
00017 namespace evf {
00018
00019
00020 namespace evt {
00021 enum State_t { EMPTY, STOP, LUMISECTION,
00022 RAWWRITING, RAWWRITTEN,
00023 RAWREADING, RAWREAD,
00024 PROCESSING, PROCESSED,
00025 RECOWRITING,RECOWRITTEN,
00026 SENDING, SENT,
00027 DISCARDING };
00028 }
00029
00030
00031 namespace dqm {
00032 enum State_t { EMPTY,
00033 WRITING, WRITTEN,
00034 SENDING, SENT,
00035 DISCARDING };
00036 }
00037
00038
00039 class FUShmBuffer
00040 {
00041
00042
00043
00044 private:
00045 FUShmBuffer(bool segmentationMode,
00046 unsigned int nRawCells,
00047 unsigned int nRecoCells,
00048 unsigned int nDqmCells,
00049 unsigned int rawCellSize,
00050 unsigned int recoCellSize,
00051 unsigned int dqmCellSize);
00052 public:
00053 ~FUShmBuffer();
00054
00055
00056 public:
00057
00058
00059
00060 void initialize(unsigned int shmid,unsigned int semid);
00061 void reset();
00062
00063 unsigned int nRawCells() const { return nRawCells_; }
00064 unsigned int nRecoCells() const { return nRecoCells_; }
00065 unsigned int nDqmCells() const { return nDqmCells_; }
00066
00067 int shmid() const { return shmid_; }
00068 int semid() const { return semid_; }
00069 unsigned int nClients() const { return nClients_; }
00070
00071 evt::State_t evtState(unsigned int index);
00072 dqm::State_t dqmState(unsigned int index);
00073
00074 unsigned int evtNumber(unsigned int index);
00075 pid_t evtPrcId(unsigned int index);
00076 time_t evtTimeStamp(unsigned int index);
00077 pid_t clientPrcId(unsigned int index);
00078
00079 int nbRawCellsToWrite() const;
00080 int nbRawCellsToRead() const;
00081
00082 FUShmRawCell* rawCellToWrite();
00083 FUShmRawCell* rawCellToRead();
00084 FUShmRecoCell* recoCellToRead();
00085 FUShmDqmCell* dqmCellToRead();
00086 FUShmRawCell* rawCellToDiscard();
00087
00088 void finishWritingRawCell(FUShmRawCell* cell);
00089 void finishReadingRawCell(FUShmRawCell* cell);
00090 void finishReadingRecoCell(FUShmRecoCell* cell);
00091 void finishReadingDqmCell(FUShmDqmCell* cell);
00092
00093 void scheduleRawCellForDiscard(unsigned int iCell);
00094 void scheduleRawCellForDiscardServerSide(unsigned int iCell);
00095
00096 void discardRawCell(FUShmRawCell* cell);
00097 void discardRecoCell(unsigned int iCell);
00098 void discardOrphanedRecoCell(unsigned int iCell);
00099 void discardDqmCell(unsigned int iCell);
00100
00101 void releaseRawCell(FUShmRawCell* cell);
00102
00103 void writeRawEmptyEvent();
00104 void writeRawLumiSectionEvent(unsigned int);
00105 void writeRecoEmptyEvent();
00106 void writeDqmEmptyEvent();
00107
00108 void scheduleRawEmptyCellForDiscard();
00109 void scheduleRawEmptyCellForDiscard(FUShmRawCell* cell);
00110 void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell);
00111
00112 bool writeRecoInitMsg(unsigned int outModId,
00113 unsigned int fuProcessId,
00114 unsigned int fuGuid,
00115 unsigned char *data,
00116 unsigned int dataSize);
00117
00118 bool writeRecoEventData(unsigned int runNumber,
00119 unsigned int evtNumber,
00120 unsigned int outModId,
00121 unsigned int fuProcessId,
00122 unsigned int fuGuid,
00123 unsigned char *data,
00124 unsigned int dataSize);
00125
00126 bool writeErrorEventData(unsigned int runNumber,
00127 unsigned int fuProcessId,
00128 unsigned int iRawCell);
00129
00130 bool writeDqmEventData(unsigned int runNumber,
00131 unsigned int evtAtUpdate,
00132 unsigned int folderId,
00133 unsigned int fuProcessId,
00134 unsigned int fuGuid,
00135 unsigned char *data,
00136 unsigned int dataSize);
00137
00138 void sem_print();
00139 void printEvtState(unsigned int index);
00140 void printDqmState(unsigned int index);
00141
00142
00143
00144
00145
00146 static FUShmBuffer* createShmBuffer(bool semgmentationMode,
00147 unsigned int nRawCells,
00148 unsigned int nRecoCells,
00149 unsigned int nDqmCells,
00150 unsigned int rawCellSize =0x400000,
00151 unsigned int recoCellSize=0x400000,
00152 unsigned int dqmCellSize =0x400000);
00153
00154 static FUShmBuffer* getShmBuffer();
00155
00156 static bool releaseSharedMemory();
00157
00158 static unsigned int size(bool segmentationMode,
00159 unsigned int nRawCells,
00160 unsigned int nRecoCells,
00161 unsigned int nDqmCells,
00162 unsigned int rawCellSize,
00163 unsigned int recoCellSize,
00164 unsigned int dqmCellSize);
00165
00166
00167 static key_t getShmDescriptorKey();
00168 static key_t getShmKey();
00169 static key_t getSemKey();
00170
00171 static int shm_create(key_t key,int size);
00172 static int shm_get(key_t key,int size);
00173 static void* shm_attach(int shmid);
00174 static int shm_nattch(int shmid);
00175 static int shm_destroy(int shmid);
00176
00177 static int sem_create(key_t key,int nsem);
00178 static int sem_get(key_t key,int nsem);
00179 static int sem_destroy(int semid);
00180
00181
00182 private:
00183
00184
00185
00186 unsigned int nextIndex(unsigned int offset,
00187 unsigned int nCells,
00188 unsigned int& iNext);
00189 void postIndex(unsigned int index,
00190 unsigned int offset,
00191 unsigned int nCells,
00192 unsigned int& iLast);
00193
00194 unsigned int nextRawWriteIndex();
00195 unsigned int nextRawReadIndex();
00196 void postRawIndexToWrite(unsigned int index);
00197 void postRawIndexToRead(unsigned int index);
00198
00199 unsigned int nextRecoWriteIndex();
00200 unsigned int nextRecoReadIndex();
00201 void postRecoIndexToWrite(unsigned int index);
00202 void postRecoIndexToRead(unsigned int index);
00203
00204 unsigned int nextDqmWriteIndex();
00205 unsigned int nextDqmReadIndex();
00206 void postDqmIndexToWrite(unsigned int index);
00207 void postDqmIndexToRead(unsigned int index);
00208
00209 unsigned int indexForEvtNumber(unsigned int evtNumber);
00210
00211 public:
00212 bool setEvtState(unsigned int index,evt::State_t state);
00213 bool setDqmState(unsigned int index,dqm::State_t state);
00214 private:
00215 bool setEvtDiscard(unsigned int index,unsigned int discard);
00216 int incEvtDiscard(unsigned int index);
00217 bool setEvtNumber(unsigned int index,unsigned int evtNumber);
00218 bool setEvtPrcId(unsigned int index,pid_t prcId);
00219 bool setEvtTimeStamp(unsigned int index,time_t timeStamp);
00220
00221 bool setClientPrcId(pid_t prcId);
00222 public:
00223 bool removeClientPrcId(pid_t prcId);
00224
00225 FUShmRawCell* rawCell(unsigned int iCell);
00226 FUShmRecoCell* recoCell(unsigned int iCell);
00227 FUShmDqmCell* dqmCell(unsigned int iCell);
00228
00229 private:
00230
00231 bool rawCellReadyForDiscard(unsigned int index);
00232
00233 key_t shmKey(unsigned int iCell,unsigned int offset);
00234 key_t rawCellShmKey(unsigned int iCell);
00235 key_t recoCellShmKey(unsigned int iCell);
00236 key_t dqmCellShmKey(unsigned int iCell);
00237
00238 void sem_init(int isem,int value);
00239 int sem_wait(int isem);
00240 void sem_post(int isem);
00241
00242 public:
00243 void lock() { sem_wait(0); }
00244 void unlock() { sem_post(0); }
00245 private:
00246 void waitRawWrite() { sem_wait(1); }
00247 void postRawWrite() { sem_post(1); }
00248 void waitRawRead() { sem_wait(2); }
00249 void postRawRead() { sem_post(2); }
00250 void waitRawDiscard() { sem_wait(3); }
00251 void postRawDiscard() { sem_post(3); }
00252 void waitRawDiscarded() { sem_wait(4); }
00253 void postRawDiscarded() { sem_post(4); }
00254 void waitRecoWrite() { sem_wait(5); }
00255 void postRecoWrite() { sem_post(5); }
00256 void waitRecoRead() { sem_wait(6); }
00257 void postRecoRead() { sem_post(6); }
00258 void waitDqmWrite() { sem_wait(7); }
00259 void postDqmWrite() { sem_post(7); }
00260 void waitDqmRead() { sem_wait(8); }
00261 void postDqmRead() { sem_post(8); }
00262
00263
00264 private:
00265
00266
00267
00268 bool segmentationMode_;
00269 int shmid_;
00270 int semid_;
00271
00272 unsigned int rawWriteNext_;
00273 unsigned int rawWriteLast_;
00274 unsigned int rawWriteOffset_;
00275 unsigned int rawReadNext_;
00276 unsigned int rawReadLast_;
00277 unsigned int rawReadOffset_;
00278 unsigned int rawDiscardIndex_;
00279
00280 unsigned int recoWriteNext_;
00281 unsigned int recoWriteLast_;
00282 unsigned int recoWriteOffset_;
00283 unsigned int recoReadNext_;
00284 unsigned int recoReadLast_;
00285 unsigned int recoReadOffset_;
00286
00287 unsigned int dqmWriteNext_;
00288 unsigned int dqmWriteLast_;
00289 unsigned int dqmWriteOffset_;
00290 unsigned int dqmReadNext_;
00291 unsigned int dqmReadLast_;
00292 unsigned int dqmReadOffset_;
00293
00294 unsigned int evtStateOffset_;
00295 unsigned int evtDiscardOffset_;
00296 unsigned int evtNumberOffset_;
00297 unsigned int evtPrcIdOffset_;
00298 unsigned int evtTimeStampOffset_;
00299 unsigned int dqmStateOffset_;
00300
00301 unsigned int nClients_;
00302 unsigned int nClientsMax_;
00303 unsigned int clientPrcIdOffset_;
00304
00305 unsigned int nRawCells_;
00306 unsigned int rawCellPayloadSize_;
00307 unsigned int rawCellTotalSize_;
00308 unsigned int rawCellOffset_;
00309
00310 unsigned int recoWriteIndex_;
00311 unsigned int recoReadIndex_;
00312 unsigned int nRecoCells_;
00313 unsigned int recoCellPayloadSize_;
00314 unsigned int recoCellTotalSize_;
00315 unsigned int recoCellOffset_;
00316
00317 unsigned int dqmWriteIndex_;
00318 unsigned int dqmReadIndex_;
00319 unsigned int nDqmCells_;
00320 unsigned int dqmCellPayloadSize_;
00321 unsigned int dqmCellTotalSize_;
00322 unsigned int dqmCellOffset_;
00323
00324 static const char* shmKeyPath_;
00325 static const char* semKeyPath_;
00326
00327 };
00328
00329
00330 }
00331
00332
00333 #endif