CMS 3D CMS Logo

FedRawDataInputSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
2 #define EventFilter_Utilities_FedRawDataInputSource_h
3 
4 #include <condition_variable>
5 #include <cstdio>
6 #include <filesystem>
7 #include <memory>
8 #include <mutex>
9 #include <thread>
10 
11 #include "oneapi/tbb/concurrent_queue.h"
12 #include "oneapi/tbb/concurrent_vector.h"
13 
22 
25 
27 class InputSourceDescription;
28 class ParameterSet;
29 
30 struct InputFile;
31 struct InputChunk;
32 
33 namespace evf {
35  namespace FastMonState {
36  enum InputState : short;
37  }
38 } // namespace evf
39 
41  friend struct InputFile;
42  friend struct InputChunk;
43 
44 public:
46  ~FedRawDataInputSource() override;
47  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
48 
49  std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
50 
51 protected:
52  Next checkNext() override;
53  void read(edm::EventPrincipal& eventPrincipal) override;
56 
57 private:
58  void rewind_() override;
59 
60  void maybeOpenNewLumiSection(const uint32_t lumiSection);
64 
65  void readSupervisor();
66  void readWorker(unsigned int tid);
67  void threadError();
69 
70  //functions for single buffered reader
72 
73  //monitoring
74  void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
75 
76  long initFileList();
78  std::string& nextFile,
79  uint32_t& fsize,
80  uint64_t& lockWaitTime);
81 
82  //variables
85 
87 
88  unsigned int eventChunkSize_; // for buffered read-ahead
89  unsigned int eventChunkBlock_; // how much read(2) asks at the time
90  unsigned int readBlocks_;
91  unsigned int numBuffers_;
92  unsigned int maxBufferedFiles_;
93  unsigned int numConcurrentReads_;
94  std::atomic<unsigned int> readingFilesCount_;
95 
96  // get LS from filename instead of event header
97  const bool getLSFromFilename_;
99  const bool verifyChecksum_;
100  const bool useL1EventID_;
101  const std::vector<unsigned int> testTCDSFEDRange_;
102  std::vector<std::string> fileNames_;
104  //std::vector<std::string> fileNamesSorted_;
105 
106  const bool fileListMode_;
107  unsigned int fileListIndex_ = 0;
108  const bool fileListLoopMode_;
109  unsigned int loopModeIterationInc_ = 0;
110 
113 
115 
116  std::unique_ptr<FRDEventMsgView> event_;
117 
120 
121  unsigned int currentLumiSection_;
122  uint32_t eventRunNumber_ = 0;
123  uint32_t GTPEventID_ = 0;
124  uint32_t L1EventID_ = 0;
125  unsigned char* tcds_pointer_;
126  unsigned int eventsThisLumi_;
127  unsigned long eventsThisRun_ = 0;
128 
131 
132  /*
133  *
134  * Multithreaded file reader
135  *
136  **/
137 
138  typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
139 
140  uint16_t detectedFRDversion_ = 0;
141  std::unique_ptr<InputFile> currentFile_;
142  bool chunkIsFree_ = false;
143 
145  std::unique_ptr<std::thread> readSupervisorThread_;
146  std::vector<std::thread*> workerThreads_;
147 
148  tbb::concurrent_queue<unsigned int> workerPool_;
149  std::vector<ReaderInfo> workerJob_;
150 
151  tbb::concurrent_queue<InputChunk*> freeChunks_;
152  tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
153 
155  std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
156  std::vector<unsigned int> tid_active_;
157 
158  std::atomic<bool> quit_threads_;
159  std::vector<unsigned int> thread_quit_signal;
160  bool setExceptionState_ = false;
162  std::condition_variable startupCv_;
163 
165  std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
166  std::list<std::pair<int, std::string>> fileNamesToDelete_;
168  std::vector<int> streamFileTracker_;
169  unsigned int checkEvery_ = 10;
170 
171  //supervisor thread wakeup
173  std::condition_variable cvWakeup_;
174 
175  //variables for the single buffered mode
177  int fileDescriptor_ = -1;
178  uint32_t bufferInputRead_ = 0;
179 
180  std::atomic<bool> threadInit_;
181 
182  std::map<unsigned int, unsigned int> sourceEventsReport_;
184 };
185 
186 struct InputChunk {
187  unsigned char* buf_;
188  InputChunk* next_ = nullptr;
189  uint32_t size_;
190  uint32_t usedSize_ = 0;
191  unsigned int index_;
192  unsigned int offset_;
193  unsigned int fileIndex_;
194  std::atomic<bool> readComplete_;
195 
196  InputChunk(unsigned int index, uint32_t size) : size_(size), index_(index) {
197  buf_ = new unsigned char[size_];
198  reset(0, 0, 0);
199  }
200  void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex) {
201  offset_ = newOffset;
202  usedSize_ = toRead;
203  fileIndex_ = fileIndex;
204  readComplete_ = false;
205  }
206 
207  ~InputChunk() { delete[] buf_; }
208 };
209 
210 struct InputFile {
213  unsigned int lumi_;
216  int rawFd_;
218  uint16_t rawHeaderSize_;
219  uint32_t nChunks_;
220  int nEvents_;
221  unsigned int nProcessed_;
222 
223  tbb::concurrent_vector<InputChunk*> chunks_;
224 
225  uint32_t bufferPosition_ = 0;
226  uint32_t chunkPosition_ = 0;
227  unsigned int currentChunk_ = 0;
228 
230  unsigned int lumi = 0,
231  std::string const& name = std::string(),
232  bool deleteFile = true,
233  int rawFd = -1,
234  uint64_t fileSize = 0,
235  uint16_t rawHeaderSize = 0,
236  uint32_t nChunks = 0,
237  int nEvents = 0,
238  FedRawDataInputSource* parent = nullptr)
239  : parent_(parent),
240  status_(status),
241  lumi_(lumi),
242  fileName_(name),
243  deleteFile_(deleteFile),
244  rawFd_(rawFd),
245  fileSize_(fileSize),
246  rawHeaderSize_(rawHeaderSize),
247  nChunks_(nChunks),
248  nEvents_(nEvents),
249  nProcessed_(0) {
250  for (unsigned int i = 0; i < nChunks; i++)
251  chunks_.push_back(nullptr);
252  }
253  ~InputFile();
254 
256 
257  bool waitForChunk(unsigned int chunkid) {
258  //some atomics to make sure everything is cache synchronized for the main thread
259  return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
260  }
261  bool advance(unsigned char*& dataPosition, const size_t size);
262  void moveToPreviousChunk(const size_t size, const size_t offset);
263  void rewindChunk(const size_t size);
264  void unsetDeleteFile() { deleteFile_ = false; }
265  int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
266 };
267 
268 #endif // EventFilter_Utilities_FedRawDataInputSource_h
269 
size
Write out results.
Definition: fillJson.h:27
unsigned int lumi_
std::vector< std::string > fileNames_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
int64_t fileSizeLeft() const
unsigned int offset_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
void maybeOpenNewLumiSection(const uint32_t lumiSection)
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
static std::mutex mutex
Definition: Proxy.cc:8
void rewindChunk(const size_t size)
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), bool deleteFile=true, int rawFd=-1, uint64_t fileSize=0, uint16_t rawHeaderSize=0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent=nullptr)
evf::EvFDaqDirector::FileStatus status_
unsigned int index_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
InputChunk(unsigned int index, uint32_t size)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
std::pair< InputFile *, InputChunk * > ReaderInfo
uint16_t rawHeaderSize_
std::unique_ptr< std::thread > readSupervisorThread_
FedRawDataInputSource * parent_
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::list< std::pair< int, std::string > > fileNamesToDelete_
const std::vector< unsigned int > testTCDSFEDRange_
uint32_t bufferPosition_
bool advance(unsigned char *&dataPosition, const size_t size)
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setMonStateSup(evf::FastMonState::InputState state)
InputFile(std::string &name)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
std::vector< std::thread * > workerThreads_
void setMonState(evf::FastMonState::InputState state)
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:13
unsigned int currentChunk_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
ItemType state() const
Definition: InputSource.h:332
std::atomic< bool > threadInit_
evf::EvFDaqDirector * daqDirector_
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
InputChunk * next_
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
int events