00001 #ifndef FUSHMBUFFER_H
00002 #define FUSHMBUFFER_H 1
00003
00004
00005 #include "EventFilter/ShmBuffer/interface/FUShmRawCell.h"
00006 #include "EventFilter/ShmBuffer/interface/FUShmRecoCell.h"
00007 #include "EventFilter/ShmBuffer/interface/FUShmDqmCell.h"
00008
00009
00010 #include <sys/ipc.h>
00011 #include <sys/types.h>
00012 #include <sys/shm.h>
00013 #include <sys/sem.h>
00014 #include <errno.h>
00015
00016 #include <string>
00017
00018 namespace evf {
00019
00020
00021 namespace evt {
00022 enum State_t { EMPTY, STOP,
00023 RAWWRITING, RAWWRITTEN,
00024 RAWREADING, RAWREAD,
00025 PROCESSING, PROCESSED,
00026 RECOWRITING,RECOWRITTEN,
00027 SENDING, SENT,
00028 LUMISECTION, USEDLS,
00029 DISCARDING };
00030 }
00031
00032
00033 namespace dqm {
00034 enum State_t { EMPTY,
00035 WRITING, WRITTEN,
00036 SENDING, SENT,
00037 DISCARDING };
00038 }
00039
00040
00041 class FUShmBuffer
00042 {
00043
00044
00045
00046 private:
00047 FUShmBuffer(bool segmentationMode,
00048 unsigned int nRawCells,
00049 unsigned int nRecoCells,
00050 unsigned int nDqmCells,
00051 unsigned int rawCellSize,
00052 unsigned int recoCellSize,
00053 unsigned int dqmCellSize);
00054 public:
00055 ~FUShmBuffer();
00056
00057
00058 public:
00059
00060
00061
00062 void initialize(unsigned int shmid,unsigned int semid);
00063 void reset(bool);
00064
00065 unsigned int nRawCells() const { return nRawCells_; }
00066 unsigned int nRecoCells() const { return nRecoCells_; }
00067 unsigned int nDqmCells() const { return nDqmCells_; }
00068
00069 int shmid() const { return shmid_; }
00070 int semid() const { return semid_; }
00071 unsigned int nClients() const { return nClients_; }
00072
00073 evt::State_t evtState(unsigned int index);
00074 dqm::State_t dqmState(unsigned int index);
00075
00076 unsigned int evtNumber(unsigned int index);
00077 pid_t evtPrcId(unsigned int index);
00078 time_t evtTimeStamp(unsigned int index);
00079 pid_t clientPrcId(unsigned int index);
00080
00081 unsigned int nbRawCellsToWrite() const;
00082 int nbRawCellsToRead() const;
00083
00084 FUShmRawCell* rawCellToWrite();
00085 FUShmRawCell* rawCellToRead();
00086 FUShmRecoCell* recoCellToRead();
00087 FUShmDqmCell* dqmCellToRead();
00088 FUShmRawCell* rawCellToDiscard();
00089
00090 void finishWritingRawCell(FUShmRawCell* cell);
00091 void finishReadingRawCell(FUShmRawCell* cell);
00092 void finishReadingRecoCell(FUShmRecoCell* cell);
00093 void finishReadingDqmCell(FUShmDqmCell* cell);
00094
00095 void scheduleRawCellForDiscard(unsigned int iCell);
00096 void scheduleRawCellForDiscardServerSide(unsigned int iCell);
00097
00098 void discardRawCell(FUShmRawCell* cell);
00099 void discardRecoCell(unsigned int iCell);
00100 void discardOrphanedRecoCell(unsigned int iCell);
00101 void discardDqmCell(unsigned int iCell);
00102
00103 void releaseRawCell(FUShmRawCell* cell);
00104
00105 void writeRawEmptyEvent();
00106 void writeRawLumiSectionEvent(unsigned int);
00107 void writeRecoEmptyEvent();
00108 void writeDqmEmptyEvent();
00109
00110 void scheduleRawEmptyCellForDiscard();
00111 bool scheduleRawEmptyCellForDiscard(FUShmRawCell* cell, bool &pidstatus);
00112 void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell);
00113
00114 bool writeRecoInitMsg(unsigned int outModId,
00115 unsigned int fuProcessId,
00116 unsigned int fuGuid,
00117 unsigned char *data,
00118 unsigned int dataSize,
00119 unsigned int nExpectedEPs);
00120
00121 bool writeRecoEventData(unsigned int runNumber,
00122 unsigned int evtNumber,
00123 unsigned int outModId,
00124 unsigned int fuProcessId,
00125 unsigned int fuGuid,
00126 unsigned char *data,
00127 unsigned int dataSize);
00128
00129 bool writeErrorEventData(unsigned int runNumber,
00130 unsigned int fuProcessId,
00131 unsigned int iRawCell,
00132 bool checkValue);
00133
00134 bool writeDqmEventData(unsigned int runNumber,
00135 unsigned int evtAtUpdate,
00136 unsigned int folderId,
00137 unsigned int fuProcessId,
00138 unsigned int fuGuid,
00139 unsigned char *data,
00140 unsigned int dataSize);
00141
00142 void sem_print();
00143 std::string sem_print_s();
00144 void printEvtState(unsigned int index);
00145 void printDqmState(unsigned int index);
00146
00147
00148
00149
00150
00151 static FUShmBuffer* createShmBuffer(bool semgmentationMode,
00152 unsigned int nRawCells,
00153 unsigned int nRecoCells,
00154 unsigned int nDqmCells,
00155 unsigned int rawCellSize =0x400000,
00156 unsigned int recoCellSize=0x400000,
00157 unsigned int dqmCellSize =0x400000);
00158
00159 static FUShmBuffer* getShmBuffer();
00160
00161 static bool releaseSharedMemory();
00162
00163 static unsigned int size(bool segmentationMode,
00164 unsigned int nRawCells,
00165 unsigned int nRecoCells,
00166 unsigned int nDqmCells,
00167 unsigned int rawCellSize,
00168 unsigned int recoCellSize,
00169 unsigned int dqmCellSize);
00170
00171
00172 static key_t getShmDescriptorKey();
00173 static key_t getShmKey();
00174 static key_t getSemKey();
00175
00176 static int shm_create(key_t key,int size);
00177 static int shm_get(key_t key,int size);
00178 static void* shm_attach(int shmid);
00179 static int shm_nattch(int shmid);
00180 static int shm_destroy(int shmid);
00181
00182 static int sem_create(key_t key,int nsem);
00183 static int sem_get(key_t key,int nsem);
00184 static int sem_destroy(int semid);
00185
00186
00187 private:
00188
00189
00190
00191 unsigned int nextIndex(unsigned int offset,
00192 unsigned int nCells,
00193 unsigned int& iNext);
00194 void postIndex(unsigned int index,
00195 unsigned int offset,
00196 unsigned int nCells,
00197 unsigned int& iLast);
00198
00199 unsigned int nextRawWriteIndex();
00200 unsigned int nextRawReadIndex();
00201 void postRawIndexToWrite(unsigned int index);
00202 void postRawIndexToRead(unsigned int index);
00203
00204 unsigned int nextRecoWriteIndex();
00205 unsigned int nextRecoReadIndex();
00206 void postRecoIndexToWrite(unsigned int index);
00207 void postRecoIndexToRead(unsigned int index);
00208
00209 unsigned int nextDqmWriteIndex();
00210 unsigned int nextDqmReadIndex();
00211 void postDqmIndexToWrite(unsigned int index);
00212 void postDqmIndexToRead(unsigned int index);
00213
00214 unsigned int indexForEvtNumber(unsigned int evtNumber);
00215 unsigned int indexForEvtPrcId(pid_t evtNumber);
00216
00217 public:
00218 bool setEvtState(unsigned int index,evt::State_t state, bool lockShm=true);
00219 bool setDqmState(unsigned int index,dqm::State_t state);
00220 bool setEvtDiscard(unsigned int index,unsigned int discard, bool checkValue=false, bool lockShm=true);
00221 int incEvtDiscard(unsigned int index, bool lockShm=true);
00222 private:
00223 bool setEvtNumber(unsigned int index,unsigned int evtNumber);
00224 bool setEvtTimeStamp(unsigned int index,time_t timeStamp);
00225
00226 bool setClientPrcId(pid_t prcId);
00227 public:
00228 bool setEvtPrcId(unsigned int index,pid_t prcId);
00229 bool removeClientPrcId(pid_t prcId);
00230
00231 FUShmRawCell* rawCell(unsigned int iCell);
00232 FUShmRecoCell* recoCell(unsigned int iCell);
00233 FUShmDqmCell* dqmCell(unsigned int iCell);
00234
00235 private:
00236
00237 bool rawCellReadyForDiscard(unsigned int index);
00238
00239 key_t shmKey(unsigned int iCell,unsigned int offset);
00240 key_t rawCellShmKey(unsigned int iCell);
00241 key_t recoCellShmKey(unsigned int iCell);
00242 key_t dqmCellShmKey(unsigned int iCell);
00243
00244 void sem_init(int isem,int value);
00245 int sem_wait(int isem);
00246 void sem_post(int isem);
00247
00248 public:
00249 void lock() { sem_wait(0); }
00250 void unlock() { sem_post(0); }
00251 private:
00252 int waitRawWrite() { return sem_wait(1); }
00253 void postRawWrite() { sem_post(1); }
00254 void waitRawRead() { sem_wait(2); }
00255 void postRawRead() { sem_post(2); }
00256 void waitRawDiscard() { sem_wait(3); }
00257 void postRawDiscard() { sem_post(3); }
00258 void waitRawDiscarded() { sem_wait(4); }
00259 void postRawDiscarded() { sem_post(4); }
00260 void waitRecoWrite() { sem_wait(5); }
00261 void postRecoWrite() { sem_post(5); }
00262 void waitRecoRead() { sem_wait(6); }
00263 void postRecoRead() { sem_post(6); }
00264 void waitDqmWrite() { sem_wait(7); }
00265 void postDqmWrite() { sem_post(7); }
00266 void waitDqmRead() { sem_wait(8); }
00267 void postDqmRead() { sem_post(8); }
00268
00269
00270 private:
00271
00272
00273
00274 bool segmentationMode_;
00275 int shmid_;
00276 int semid_;
00277
00278 unsigned int rawWriteNext_;
00279 unsigned int rawWriteLast_;
00280 unsigned int rawWriteOffset_;
00281 unsigned int rawReadNext_;
00282 unsigned int rawReadLast_;
00283 unsigned int rawReadOffset_;
00284 unsigned int rawDiscardIndex_;
00285
00286 unsigned int recoWriteNext_;
00287 unsigned int recoWriteLast_;
00288 unsigned int recoWriteOffset_;
00289 unsigned int recoReadNext_;
00290 unsigned int recoReadLast_;
00291 unsigned int recoReadOffset_;
00292
00293 unsigned int dqmWriteNext_;
00294 unsigned int dqmWriteLast_;
00295 unsigned int dqmWriteOffset_;
00296 unsigned int dqmReadNext_;
00297 unsigned int dqmReadLast_;
00298 unsigned int dqmReadOffset_;
00299
00300 unsigned int evtStateOffset_;
00301 unsigned int evtDiscardOffset_;
00302 unsigned int evtNumberOffset_;
00303 unsigned int evtPrcIdOffset_;
00304 unsigned int evtTimeStampOffset_;
00305 unsigned int dqmStateOffset_;
00306
00307 unsigned int nClients_;
00308 unsigned int nClientsMax_;
00309 unsigned int clientPrcIdOffset_;
00310
00311 unsigned int nRawCells_;
00312 unsigned int rawCellPayloadSize_;
00313 unsigned int rawCellTotalSize_;
00314 unsigned int rawCellOffset_;
00315
00316 unsigned int recoWriteIndex_;
00317 unsigned int recoReadIndex_;
00318 unsigned int nRecoCells_;
00319 unsigned int recoCellPayloadSize_;
00320 unsigned int recoCellTotalSize_;
00321 unsigned int recoCellOffset_;
00322
00323 unsigned int dqmWriteIndex_;
00324 unsigned int dqmReadIndex_;
00325 unsigned int nDqmCells_;
00326 unsigned int dqmCellPayloadSize_;
00327 unsigned int dqmCellTotalSize_;
00328 unsigned int dqmCellOffset_;
00329
00330 static const char* shmKeyPath_;
00331 static const char* semKeyPath_;
00332
00333 };
00334
00335
00336 }
00337
00338
00339 #endif