00001 #ifndef FUSHMBUFFER_H
00002 #define FUSHMBUFFER_H 1
00003
00004
00005 #include "EventFilter/ShmBuffer/interface/FUShmRawCell.h"
00006 #include "EventFilter/ShmBuffer/interface/FUShmRecoCell.h"
00007 #include "EventFilter/ShmBuffer/interface/FUShmDqmCell.h"
00008
00009
00010 #include <sys/ipc.h>
00011 #include <sys/types.h>
00012 #include <sys/shm.h>
00013 #include <sys/sem.h>
00014 #include <errno.h>
00015
00016
00017 namespace evf {
00018
00019
00020 namespace evt {
00021 enum State_t { EMPTY, STOP,
00022 RAWWRITING, RAWWRITTEN,
00023 RAWREADING, RAWREAD,
00024 PROCESSING, PROCESSED,
00025 RECOWRITING,RECOWRITTEN,
00026 SENDING, SENT,
00027 LUMISECTION, USEDLS,
00028 DISCARDING };
00029 }
00030
00031
00032 namespace dqm {
00033 enum State_t { EMPTY,
00034 WRITING, WRITTEN,
00035 SENDING, SENT,
00036 DISCARDING };
00037 }
00038
00039
00040 class FUShmBuffer
00041 {
00042
00043
00044
00045 private:
00046 FUShmBuffer(bool segmentationMode,
00047 unsigned int nRawCells,
00048 unsigned int nRecoCells,
00049 unsigned int nDqmCells,
00050 unsigned int rawCellSize,
00051 unsigned int recoCellSize,
00052 unsigned int dqmCellSize);
00053 public:
00054 ~FUShmBuffer();
00055
00056
00057 public:
00058
00059
00060
00061 void initialize(unsigned int shmid,unsigned int semid);
00062 void reset();
00063
00064 unsigned int nRawCells() const { return nRawCells_; }
00065 unsigned int nRecoCells() const { return nRecoCells_; }
00066 unsigned int nDqmCells() const { return nDqmCells_; }
00067
00068 int shmid() const { return shmid_; }
00069 int semid() const { return semid_; }
00070 unsigned int nClients() const { return nClients_; }
00071
00072 evt::State_t evtState(unsigned int index);
00073 dqm::State_t dqmState(unsigned int index);
00074
00075 unsigned int evtNumber(unsigned int index);
00076 pid_t evtPrcId(unsigned int index);
00077 time_t evtTimeStamp(unsigned int index);
00078 pid_t clientPrcId(unsigned int index);
00079
00080 unsigned int nbRawCellsToWrite() const;
00081 int nbRawCellsToRead() const;
00082
00083 FUShmRawCell* rawCellToWrite();
00084 FUShmRawCell* rawCellToRead();
00085 FUShmRecoCell* recoCellToRead();
00086 FUShmDqmCell* dqmCellToRead();
00087 FUShmRawCell* rawCellToDiscard();
00088
00089 void finishWritingRawCell(FUShmRawCell* cell);
00090 void finishReadingRawCell(FUShmRawCell* cell);
00091 void finishReadingRecoCell(FUShmRecoCell* cell);
00092 void finishReadingDqmCell(FUShmDqmCell* cell);
00093
00094 void scheduleRawCellForDiscard(unsigned int iCell);
00095 void scheduleRawCellForDiscardServerSide(unsigned int iCell);
00096
00097 void discardRawCell(FUShmRawCell* cell);
00098 void discardRecoCell(unsigned int iCell);
00099 void discardOrphanedRecoCell(unsigned int iCell);
00100 void discardDqmCell(unsigned int iCell);
00101
00102 void releaseRawCell(FUShmRawCell* cell);
00103
00104 void writeRawEmptyEvent();
00105 void writeRawLumiSectionEvent(unsigned int);
00106 void writeRecoEmptyEvent();
00107 void writeDqmEmptyEvent();
00108
00109 void scheduleRawEmptyCellForDiscard();
00110 void scheduleRawEmptyCellForDiscard(FUShmRawCell* cell);
00111 void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell);
00112
00113 bool writeRecoInitMsg(unsigned int outModId,
00114 unsigned int fuProcessId,
00115 unsigned int fuGuid,
00116 unsigned char *data,
00117 unsigned int dataSize,
00118 unsigned int nExpectedEPs);
00119
00120 bool writeRecoEventData(unsigned int runNumber,
00121 unsigned int evtNumber,
00122 unsigned int outModId,
00123 unsigned int fuProcessId,
00124 unsigned int fuGuid,
00125 unsigned char *data,
00126 unsigned int dataSize);
00127
00128 bool writeErrorEventData(unsigned int runNumber,
00129 unsigned int fuProcessId,
00130 unsigned int iRawCell,
00131 bool checkValue=false);
00132
00133 bool writeDqmEventData(unsigned int runNumber,
00134 unsigned int evtAtUpdate,
00135 unsigned int folderId,
00136 unsigned int fuProcessId,
00137 unsigned int fuGuid,
00138 unsigned char *data,
00139 unsigned int dataSize);
00140
00141 void sem_print();
00142 void printEvtState(unsigned int index);
00143 void printDqmState(unsigned int index);
00144
00145
00146
00147
00148
00149 static FUShmBuffer* createShmBuffer(bool semgmentationMode,
00150 unsigned int nRawCells,
00151 unsigned int nRecoCells,
00152 unsigned int nDqmCells,
00153 unsigned int rawCellSize =0x400000,
00154 unsigned int recoCellSize=0x400000,
00155 unsigned int dqmCellSize =0x400000);
00156
00157 static FUShmBuffer* getShmBuffer();
00158
00159 static bool releaseSharedMemory();
00160
00161 static unsigned int size(bool segmentationMode,
00162 unsigned int nRawCells,
00163 unsigned int nRecoCells,
00164 unsigned int nDqmCells,
00165 unsigned int rawCellSize,
00166 unsigned int recoCellSize,
00167 unsigned int dqmCellSize);
00168
00169
00170 static key_t getShmDescriptorKey();
00171 static key_t getShmKey();
00172 static key_t getSemKey();
00173
00174 static int shm_create(key_t key,int size);
00175 static int shm_get(key_t key,int size);
00176 static void* shm_attach(int shmid);
00177 static int shm_nattch(int shmid);
00178 static int shm_destroy(int shmid);
00179
00180 static int sem_create(key_t key,int nsem);
00181 static int sem_get(key_t key,int nsem);
00182 static int sem_destroy(int semid);
00183
00184
00185 private:
00186
00187
00188
00189 unsigned int nextIndex(unsigned int offset,
00190 unsigned int nCells,
00191 unsigned int& iNext);
00192 void postIndex(unsigned int index,
00193 unsigned int offset,
00194 unsigned int nCells,
00195 unsigned int& iLast);
00196
00197 unsigned int nextRawWriteIndex();
00198 unsigned int nextRawReadIndex();
00199 void postRawIndexToWrite(unsigned int index);
00200 void postRawIndexToRead(unsigned int index);
00201
00202 unsigned int nextRecoWriteIndex();
00203 unsigned int nextRecoReadIndex();
00204 void postRecoIndexToWrite(unsigned int index);
00205 void postRecoIndexToRead(unsigned int index);
00206
00207 unsigned int nextDqmWriteIndex();
00208 unsigned int nextDqmReadIndex();
00209 void postDqmIndexToWrite(unsigned int index);
00210 void postDqmIndexToRead(unsigned int index);
00211
00212 unsigned int indexForEvtNumber(unsigned int evtNumber);
00213
00214 public:
00215 bool setEvtState(unsigned int index,evt::State_t state);
00216 bool setDqmState(unsigned int index,dqm::State_t state);
00217 bool setEvtDiscard(unsigned int index,unsigned int discard, bool checkValue=false);
00218 int incEvtDiscard(unsigned int index);
00219 private:
00220 bool setEvtNumber(unsigned int index,unsigned int evtNumber);
00221 bool setEvtPrcId(unsigned int index,pid_t prcId);
00222 bool setEvtTimeStamp(unsigned int index,time_t timeStamp);
00223
00224 bool setClientPrcId(pid_t prcId);
00225 public:
00226 bool removeClientPrcId(pid_t prcId);
00227
00228 FUShmRawCell* rawCell(unsigned int iCell);
00229 FUShmRecoCell* recoCell(unsigned int iCell);
00230 FUShmDqmCell* dqmCell(unsigned int iCell);
00231
00232 private:
00233
00234 bool rawCellReadyForDiscard(unsigned int index);
00235
00236 key_t shmKey(unsigned int iCell,unsigned int offset);
00237 key_t rawCellShmKey(unsigned int iCell);
00238 key_t recoCellShmKey(unsigned int iCell);
00239 key_t dqmCellShmKey(unsigned int iCell);
00240
00241 void sem_init(int isem,int value);
00242 int sem_wait(int isem);
00243 void sem_post(int isem);
00244
00245 public:
00246 void lock() { sem_wait(0); }
00247 void unlock() { sem_post(0); }
00248 private:
00249 int waitRawWrite() { return sem_wait(1); }
00250 void postRawWrite() { sem_post(1); }
00251 void waitRawRead() { sem_wait(2); }
00252 void postRawRead() { sem_post(2); }
00253 void waitRawDiscard() { sem_wait(3); }
00254 void postRawDiscard() { sem_post(3); }
00255 void waitRawDiscarded() { sem_wait(4); }
00256 void postRawDiscarded() { sem_post(4); }
00257 void waitRecoWrite() { sem_wait(5); }
00258 void postRecoWrite() { sem_post(5); }
00259 void waitRecoRead() { sem_wait(6); }
00260 void postRecoRead() { sem_post(6); }
00261 void waitDqmWrite() { sem_wait(7); }
00262 void postDqmWrite() { sem_post(7); }
00263 void waitDqmRead() { sem_wait(8); }
00264 void postDqmRead() { sem_post(8); }
00265
00266
00267 private:
00268
00269
00270
00271 bool segmentationMode_;
00272 int shmid_;
00273 int semid_;
00274
00275 unsigned int rawWriteNext_;
00276 unsigned int rawWriteLast_;
00277 unsigned int rawWriteOffset_;
00278 unsigned int rawReadNext_;
00279 unsigned int rawReadLast_;
00280 unsigned int rawReadOffset_;
00281 unsigned int rawDiscardIndex_;
00282
00283 unsigned int recoWriteNext_;
00284 unsigned int recoWriteLast_;
00285 unsigned int recoWriteOffset_;
00286 unsigned int recoReadNext_;
00287 unsigned int recoReadLast_;
00288 unsigned int recoReadOffset_;
00289
00290 unsigned int dqmWriteNext_;
00291 unsigned int dqmWriteLast_;
00292 unsigned int dqmWriteOffset_;
00293 unsigned int dqmReadNext_;
00294 unsigned int dqmReadLast_;
00295 unsigned int dqmReadOffset_;
00296
00297 unsigned int evtStateOffset_;
00298 unsigned int evtDiscardOffset_;
00299 unsigned int evtNumberOffset_;
00300 unsigned int evtPrcIdOffset_;
00301 unsigned int evtTimeStampOffset_;
00302 unsigned int dqmStateOffset_;
00303
00304 unsigned int nClients_;
00305 unsigned int nClientsMax_;
00306 unsigned int clientPrcIdOffset_;
00307
00308 unsigned int nRawCells_;
00309 unsigned int rawCellPayloadSize_;
00310 unsigned int rawCellTotalSize_;
00311 unsigned int rawCellOffset_;
00312
00313 unsigned int recoWriteIndex_;
00314 unsigned int recoReadIndex_;
00315 unsigned int nRecoCells_;
00316 unsigned int recoCellPayloadSize_;
00317 unsigned int recoCellTotalSize_;
00318 unsigned int recoCellOffset_;
00319
00320 unsigned int dqmWriteIndex_;
00321 unsigned int dqmReadIndex_;
00322 unsigned int nDqmCells_;
00323 unsigned int dqmCellPayloadSize_;
00324 unsigned int dqmCellTotalSize_;
00325 unsigned int dqmCellOffset_;
00326
00327 static const char* shmKeyPath_;
00328 static const char* semKeyPath_;
00329
00330 };
00331
00332
00333 }
00334
00335
00336 #endif