CMS 3D CMS Logo

FedRawDataInputSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
2 #define EventFilter_Utilities_FedRawDataInputSource_h
3 
4 #include <memory>
5 #include <cstdio>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include "tbb/concurrent_queue.h"
10 #include "tbb/concurrent_vector.h"
11 
12 #include "boost/filesystem.hpp"
13 
22 
24 
26 class InputSourceDescription;
27 class ParameterSet;
28 
29 struct InputFile;
30 struct InputChunk;
31 
32 namespace evf {
34 }
35 
37  friend struct InputFile;
38  friend struct InputChunk;
39 
40 public:
42  ~FedRawDataInputSource() override;
43  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
44 
45  std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
46 
47 protected:
48  bool checkNextEvent() override;
49  void read(edm::EventPrincipal& eventPrincipal) override;
50 
51 private:
52  void rewind_() override;
53 
54  void maybeOpenNewLumiSection(const uint32_t lumiSection);
56  evf::EvFDaqDirector::FileStatus getNextEvent();
57  edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection&);
58 
59  void readSupervisor();
60  void readWorker(unsigned int tid);
61  void threadError();
62  bool exceptionState() { return setExceptionState_; }
63 
64  //functions for single buffered reader
65  void readNextChunkIntoBuffer(InputFile* file);
66 
67  //monitoring
68  void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
69 
70  long initFileList();
72  std::string& nextFile,
73  uint32_t& fsize,
74  uint64_t& lockWaitTime);
75 
76  //variables
77  evf::FastMonitoringService* fms_ = nullptr;
78  evf::EvFDaqDirector* daqDirector_ = nullptr;
79 
81 
82  unsigned int eventChunkSize_; // for buffered read-ahead
83  unsigned int eventChunkBlock_; // how much read(2) asks at the time
84  unsigned int readBlocks_;
85  unsigned int numBuffers_;
86  unsigned int maxBufferedFiles_;
87  unsigned int numConcurrentReads_;
88  std::atomic<unsigned int> readingFilesCount_;
89 
90  // get LS from filename instead of event header
91  const bool getLSFromFilename_;
93  const bool verifyChecksum_;
94  const bool useL1EventID_;
95  std::vector<std::string> fileNames_;
97  //std::vector<std::string> fileNamesSorted_;
98 
99  const bool fileListMode_;
100  unsigned int fileListIndex_ = 0;
101  const bool fileListLoopMode_;
102  unsigned int loopModeIterationInc_ = 0;
103 
106 
108 
109  std::unique_ptr<FRDEventMsgView> event_;
110 
113 
114  unsigned int currentLumiSection_;
115  uint32_t eventRunNumber_ = 0;
116  uint32_t GTPEventID_ = 0;
117  uint32_t L1EventID_ = 0;
118  unsigned char* tcds_pointer_;
119  unsigned int eventsThisLumi_;
120  unsigned long eventsThisRun_ = 0;
121 
122  /*
123  *
124  * Multithreaded file reader
125  *
126  **/
127 
128  typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
129 
130  uint32 detectedFRDversion_ = 0;
131  std::unique_ptr<InputFile> currentFile_;
132  bool chunkIsFree_ = false;
133 
134  bool startedSupervisorThread_ = false;
135  std::unique_ptr<std::thread> readSupervisorThread_;
136  std::vector<std::thread*> workerThreads_;
137 
138  tbb::concurrent_queue<unsigned int> workerPool_;
139  std::vector<ReaderInfo> workerJob_;
140 
141  tbb::concurrent_queue<InputChunk*> freeChunks_;
142  tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
143 
145  std::vector<std::condition_variable*> cvReader_;
146  std::vector<unsigned int> tid_active_;
147 
148  std::atomic<bool> quit_threads_;
149  std::vector<unsigned int> thread_quit_signal;
150  bool setExceptionState_ = false;
152  std::condition_variable startupCv_;
153 
154  int currentFileIndex_ = -1;
155  std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
156  std::list<std::pair<int, std::string>> fileNamesToDelete_;
158  std::vector<int> streamFileTracker_;
159  unsigned int nStreams_ = 0;
160  unsigned int checkEvery_ = 10;
161 
162  //supervisor thread wakeup
164  std::condition_variable cvWakeup_;
165 
166  //variables for the single buffered mode
168  int fileDescriptor_ = -1;
169  uint32_t bufferInputRead_ = 0;
170 
171  std::atomic<bool> threadInit_;
172 
173  std::map<unsigned int, unsigned int> sourceEventsReport_;
175 };
176 
177 struct InputChunk {
178  unsigned char* buf_;
179  InputChunk* next_ = nullptr;
180  uint32_t size_;
181  uint32_t usedSize_ = 0;
182  unsigned int index_;
183  unsigned int offset_;
184  unsigned int fileIndex_;
185  std::atomic<bool> readComplete_;
186 
187  InputChunk(unsigned int index, uint32_t size) : size_(size), index_(index) {
188  buf_ = new unsigned char[size_];
189  reset(0, 0, 0);
190  }
191  void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex) {
192  offset_ = newOffset;
193  usedSize_ = toRead;
194  fileIndex_ = fileIndex;
195  readComplete_ = false;
196  }
197 
198  ~InputChunk() { delete[] buf_; }
199 };
200 
201 struct InputFile {
204  unsigned int lumi_;
207  int rawFd_;
209  uint16_t rawHeaderSize_;
210  uint32_t nChunks_;
211  int nEvents_;
212  unsigned int nProcessed_;
213 
214  tbb::concurrent_vector<InputChunk*> chunks_;
215 
216  uint32_t bufferPosition_ = 0;
217  uint32_t chunkPosition_ = 0;
218  unsigned int currentChunk_ = 0;
219 
221  unsigned int lumi = 0,
222  std::string const& name = std::string(),
223  bool deleteFile = true,
224  int rawFd = -1,
225  uint64_t fileSize = 0,
226  uint16_t rawHeaderSize = 0,
227  uint32_t nChunks = 0,
228  int nEvents = 0,
229  FedRawDataInputSource* parent = nullptr)
230  : parent_(parent),
231  status_(status),
232  lumi_(lumi),
233  fileName_(name),
234  deleteFile_(deleteFile),
235  rawFd_(rawFd),
236  fileSize_(fileSize),
237  rawHeaderSize_(rawHeaderSize),
238  nChunks_(nChunks),
239  nEvents_(nEvents),
240  nProcessed_(0) {
241  for (unsigned int i = 0; i < nChunks; i++)
242  chunks_.push_back(nullptr);
243  }
244  ~InputFile();
245 
246  InputFile(std::string& name) : fileName_(name) {}
247 
248  bool waitForChunk(unsigned int chunkid) {
249  //some atomics to make sure everything is cache synchronized for the main thread
250  return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
251  }
252  bool advance(unsigned char*& dataPosition, const size_t size);
253  void moveToPreviousChunk(const size_t size, const size_t offset);
254  void rewindChunk(const size_t size);
255 };
256 
257 #endif // EventFilter_Utilities_FedRawDataInputSource_h
258 
size
Write out results.
static boost::mutex mutex
Definition: Proxy.cc:9
Definition: fillJson.h:27
unsigned int lumi_
std::vector< std::string > fileNames_
std::condition_variable cvWakeup_
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
evf::EvFDaqDirector::FileStatus status_
unsigned int index_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
InputChunk(unsigned int index, uint32_t size)
std::pair< InputFile *, InputChunk * > ReaderInfo
uint16_t rawHeaderSize_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::list< std::pair< int, std::string > > fileNamesToDelete_
std::vector< int > streamFileTracker_
unsigned char * buf_
InputFile(std::string &name)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
std::vector< std::thread * > workerThreads_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int nProcessed_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), bool deleteFile=true, int rawFd=-1, uint64_t fileSize=0, uint16_t rawHeaderSize=0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent=0)
std::atomic< unsigned int > readingFilesCount_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
UInt_t nEvents
Definition: hcalCalib.cc:41
void reset(double vett[256])
Definition: TPedValues.cc:11
std::unique_ptr< InputFile > currentFile_