CMS 3D CMS Logo

FedRawDataInputSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
2 #define EventFilter_Utilities_FedRawDataInputSource_h
3 
4 #include <condition_variable>
5 #include <cstdio>
6 #include <filesystem>
7 #include <memory>
8 #include <mutex>
9 #include <thread>
10 #include <random>
11 #include <algorithm>
12 
13 #include "oneapi/tbb/concurrent_queue.h"
14 #include "oneapi/tbb/concurrent_vector.h"
15 
24 
27 
29 class InputSourceDescription;
30 class ParameterSet;
31 
32 class InputFile;
33 struct InputChunk;
34 
35 namespace evf {
37  namespace FastMonState {
38  enum InputState : short;
39  }
40 } // namespace evf
41 
43  friend class InputFile;
44  friend struct InputChunk;
45 
46 public:
48  ~FedRawDataInputSource() override;
49  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
50 
51  std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
52 
53 protected:
54  Next checkNext() override;
55  void read(edm::EventPrincipal& eventPrincipal) override;
58 
59 private:
60  void rewind_() override;
61 
62  void maybeOpenNewLumiSection(const uint32_t lumiSection);
66 
67  void readSupervisor();
68  void readWorker(unsigned int tid);
69  void threadError();
71 
72  //functions for single buffered reader
74 
75  //monitoring
76  void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
77 
78  long initFileList();
80  std::string& nextFile,
81  uint32_t& fsize,
82  uint64_t& lockWaitTime);
83 
84  //variables
87 
89 
90  unsigned int eventChunkSize_; // for buffered read-ahead
91  unsigned int eventChunkBlock_; // how much read(2) asks at the time
92  unsigned int readBlocks_;
93  unsigned int numBuffers_;
94  unsigned int maxBufferedFiles_;
95  unsigned int numConcurrentReads_;
96  std::atomic<unsigned int> readingFilesCount_;
97 
98  // get LS from filename instead of event header
99  const bool getLSFromFilename_;
101  const bool verifyChecksum_;
102  const bool useL1EventID_;
103  const std::vector<unsigned int> testTCDSFEDRange_;
104  std::vector<std::string> fileNames_;
106  //std::vector<std::string> fileNamesSorted_;
107 
108  const bool fileListMode_;
109  unsigned int fileListIndex_ = 0;
110  const bool fileListLoopMode_;
111  unsigned int loopModeIterationInc_ = 0;
112 
115 
117 
118  std::unique_ptr<FRDEventMsgView> event_;
119 
122 
123  unsigned int currentLumiSection_;
124  uint32_t eventRunNumber_ = 0;
125  uint32_t GTPEventID_ = 0;
126  uint32_t L1EventID_ = 0;
127  unsigned char* tcds_pointer_;
128  unsigned int eventsThisLumi_;
129  unsigned long eventsThisRun_ = 0;
130 
133 
134  /*
135  *
136  * Multithreaded file reader
137  *
138  **/
139 
140  typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
141 
142  uint16_t detectedFRDversion_ = 0;
143  std::unique_ptr<InputFile> currentFile_;
144  bool chunkIsFree_ = false;
145 
147  std::unique_ptr<std::thread> readSupervisorThread_;
148  std::vector<std::thread*> workerThreads_;
149 
150  tbb::concurrent_queue<unsigned int> workerPool_;
151  std::vector<ReaderInfo> workerJob_;
152 
153  tbb::concurrent_queue<InputChunk*> freeChunks_;
154  tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
155 
157  std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
158  std::vector<unsigned int> tid_active_;
159 
160  std::atomic<bool> quit_threads_;
161  std::vector<unsigned int> thread_quit_signal;
162  bool setExceptionState_ = false;
164  std::condition_variable startupCv_;
165 
167  std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
168  std::list<std::pair<int, std::string>> fileNamesToDelete_;
170  std::vector<int> streamFileTracker_;
171  unsigned int checkEvery_ = 10;
172 
173  //supervisor thread wakeup
175  std::condition_variable cvWakeup_;
176 
177  //variables for the single buffered mode
179  int fileDescriptor_ = -1;
180  uint32_t bufferInputRead_ = 0;
181 
182  std::atomic<bool> threadInit_;
183 
184  std::map<unsigned int, unsigned int> sourceEventsReport_;
186 };
187 
188 struct InputChunk {
189  unsigned char* buf_;
190  InputChunk* next_ = nullptr;
193  //unsigned int index_;
195  unsigned int fileIndex_;
196  std::atomic<bool> readComplete_;
197 
199  buf_ = new unsigned char[size_];
200  reset(0, 0, 0);
201  }
202  void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex) {
203  offset_ = newOffset;
204  usedSize_ = toRead;
205  fileIndex_ = fileIndex;
206  readComplete_ = false;
207  }
208 
209  bool resize(uint64_t wantedSize, uint64_t maxSize) {
210  if (wantedSize > maxSize)
211  return false;
212  if (size_ < wantedSize) {
213  size_ = uint64_t(wantedSize * 1.05);
214  delete[] buf_;
215  buf_ = new unsigned char[size_];
216  }
217  return true;
218  }
219 
220  ~InputChunk() { delete[] buf_; }
221 };
222 
223 class InputFile {
224 public:
227  unsigned int lumi_;
229  //used by DAQSource
230  std::vector<std::string> fileNames_;
231  std::vector<uint64_t> diskFileSizes_;
232  std::vector<uint64_t> bufferOffsets_;
233  std::vector<uint64_t> fileSizes_;
234  std::vector<unsigned int> fileOrder_;
236  int rawFd_;
238  uint16_t rawHeaderSize_;
239  uint16_t nChunks_;
240  uint16_t numFiles_;
241  int nEvents_;
242  unsigned int nProcessed_;
243 
244  tbb::concurrent_vector<InputChunk*> chunks_;
245 
246  uint32_t bufferPosition_ = 0;
247  uint32_t chunkPosition_ = 0;
248  unsigned int currentChunk_ = 0;
249 
251  unsigned int lumi = 0,
252  std::string const& name = std::string(),
253  bool deleteFile = true,
254  int rawFd = -1,
255  uint64_t fileSize = 0,
256  uint16_t rawHeaderSize = 0,
257  uint16_t nChunks = 0,
258  int nEvents = 0,
259  FedRawDataInputSource* parent = nullptr)
260  : parent_(parent),
261  status_(status),
262  lumi_(lumi),
263  fileName_(name),
264  deleteFile_(deleteFile),
265  rawFd_(rawFd),
266  fileSize_(fileSize),
267  rawHeaderSize_(rawHeaderSize),
268  nChunks_(nChunks),
269  numFiles_(1),
270  nEvents_(nEvents),
271  nProcessed_(0) {
272  fileNames_.push_back(name);
273  fileOrder_.push_back(fileOrder_.size());
274  diskFileSizes_.push_back(fileSize);
275  fileSizes_.push_back(0);
276  bufferOffsets_.push_back(0);
277  chunks_.reserve(nChunks_);
278  for (unsigned int i = 0; i < nChunks; i++)
279  chunks_.push_back(nullptr);
280  }
281  virtual ~InputFile();
282 
283  void setChunks(uint16_t nChunks) {
284  nChunks_ = nChunks;
285  chunks_.clear();
286  chunks_.reserve(nChunks_);
287  for (unsigned int i = 0; i < nChunks_; i++)
288  chunks_.push_back(nullptr);
289  }
290 
292  size_t prevOffset = bufferOffsets_.back();
293  size_t prevSize = diskFileSizes_.back();
294  numFiles_++;
295  fileNames_.push_back(name);
296  fileOrder_.push_back(fileOrder_.size());
297  diskFileSizes_.push_back(size);
298  fileSizes_.push_back(0);
299  bufferOffsets_.push_back(prevOffset + prevSize);
300  }
301 
302  bool waitForChunk(unsigned int chunkid) {
303  //some atomics to make sure everything is cache synchronized for the main thread
304  return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
305  }
306  bool advance(unsigned char*& dataPosition, const size_t size);
307  void moveToPreviousChunk(const size_t size, const size_t offset);
308  void rewindChunk(const size_t size);
309  void unsetDeleteFile() { deleteFile_ = false; }
310  void randomizeOrder(std::default_random_engine& rng) {
311  std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
312  }
313  uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
314  int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
315 };
316 
317 #endif // EventFilter_Utilities_FedRawDataInputSource_h
318 
size
Write out results.
void setChunks(uint16_t nChunks)
Definition: fillJson.h:27
std::vector< std::string > fileNames_
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
std::vector< uint64_t > fileSizes_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
static std::mutex mutex
Definition: Proxy.cc:8
FedRawDataInputSource * parent_
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
bool advance(unsigned char *&dataPosition, const size_t size)
std::vector< uint64_t > bufferOffsets_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
std::pair< InputFile *, InputChunk * > ReaderInfo
std::unique_ptr< std::thread > readSupervisorThread_
uint16_t rawHeaderSize_
int64_t fileSizeLeft() const
bool waitForChunk(unsigned int chunkid)
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::list< std::pair< int, std::string > > fileNamesToDelete_
const std::vector< unsigned int > testTCDSFEDRange_
void rewindChunk(const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
std::vector< int > streamFileTracker_
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
void randomizeOrder(std::default_random_engine &rng)
unsigned char * buf_
evf::EvFDaqDirector::FileStatus status_
void setMonStateSup(evf::FastMonState::InputState state)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
void appendFile(std::string const &name, uint64_t size)
std::vector< std::thread * > workerThreads_
void setMonState(evf::FastMonState::InputState state)
std::atomic< bool > readComplete_
InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), bool deleteFile=true, int rawFd=-1, uint64_t fileSize=0, uint16_t rawHeaderSize=0, uint16_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent=nullptr)
def ls(path, rec=False)
Definition: eostools.py:349
tbb::concurrent_vector< InputChunk * > chunks_
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
unsigned int nProcessed_
uint32_t chunkPosition_
std::condition_variable startupCv_
ItemType state() const
Definition: InputSource.h:332
std::atomic< bool > threadInit_
unsigned int lumi_
evf::EvFDaqDirector * daqDirector_
void readWorker(unsigned int tid)
std::vector< unsigned int > fileOrder_
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
uint64_t currentChunkSize() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
bool resize(uint64_t wantedSize, uint64_t maxSize)
InputChunk * next_
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
unsigned int currentChunk_
evf::EvFDaqDirector::FileStatus nextEvent()
bidiiter shuffle(bidiiter begin, bidiiter end, size_t num_random)
Definition: Utilities.h:27
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
int events
std::vector< uint64_t > diskFileSizes_
InputChunk(uint64_t size)
std::string fileName_
uint32_t bufferPosition_