CMS 3D CMS Logo

FedRawDataInputSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
2 #define EventFilter_Utilities_FedRawDataInputSource_h
3 
4 #include <memory>
5 #include <cstdio>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include "tbb/concurrent_queue.h"
10 #include "tbb/concurrent_vector.h"
11 
12 #include "boost/filesystem.hpp"
13 
22 
24 
26 class InputSourceDescription;
27 class ParameterSet;
28 
29 struct InputFile;
30 struct InputChunk;
31 
32 namespace evf {
33 class FastMonitoringService;
34 }
35 
36 namespace jsoncollector {
37 class DataPointDefinition;
38 }
39 
40 
42 
43 friend struct InputFile;
44 friend struct InputChunk;
45 
46 public:
48  ~FedRawDataInputSource() override;
49  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
50 
51  std::pair<bool,unsigned int> getEventReport(unsigned int lumi, bool erase);
52 protected:
53  bool checkNextEvent() override;
54  void read(edm::EventPrincipal& eventPrincipal) override;
55 
56 private:
57  void rewind_() override;
58 
59  void maybeOpenNewLumiSection(const uint32_t lumiSection);
60  void createBoLSFile(const uint32_t lumiSection,bool checkIfExists);
62  evf::EvFDaqDirector::FileStatus getNextEvent();
63  edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection&);
64  void deleteFile(std::string const&);
65  int grabNextJsonFile(boost::filesystem::path const&);
66 
67  void readSupervisor();
68  void readWorker(unsigned int tid);
69  void threadError();
70  bool exceptionState() {return setExceptionState_;}
71 
72  //functions for single buffered reader
73  void readNextChunkIntoBuffer(InputFile *file);
74 
75  //monitoring
76  void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);
77 
78  long initFileList();
79  evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
80 
81  //variables
83  evf::EvFDaqDirector* daqDirector_=nullptr;
84 
86 
87  unsigned int eventChunkSize_; // for buffered read-ahead
88  unsigned int eventChunkBlock_; // how much read(2) asks at the time
89  unsigned int readBlocks_;
90  unsigned int numBuffers_;
91  unsigned int maxBufferedFiles_;
92  unsigned int numConcurrentReads_;
93  std::atomic<unsigned int> readingFilesCount_;
94 
95  // get LS from filename instead of event header
96  const bool getLSFromFilename_;
97  const bool verifyAdler32_;
98  const bool verifyChecksum_;
99  const bool useL1EventID_;
100  std::vector<std::string> fileNames_;
101  //std::vector<std::string> fileNamesSorted_;
102 
103  const bool fileListMode_;
104  unsigned int fileListIndex_ = 0;
105  const bool fileListLoopMode_;
106  unsigned int loopModeIterationInc_ = 0;
107 
110 
112 
113  std::unique_ptr<FRDEventMsgView> event_;
114 
117 
118  unsigned int currentLumiSection_;
119  uint32_t eventRunNumber_=0;
120  uint32_t GTPEventID_ = 0;
121  uint32_t L1EventID_ = 0;
122  unsigned char *tcds_pointer_;
123  unsigned int eventsThisLumi_;
124  unsigned long eventsThisRun_ = 0;
125 
127 
128  /*
129  *
130  * Multithreaded file reader
131  *
132  **/
133 
134  typedef std::pair<InputFile*,InputChunk*> ReaderInfo;
135 
136  uint32 detectedFRDversion_=0;
137  InputFile *currentFile_ = nullptr;
138  bool chunkIsFree_=false;
139 
140  bool startedSupervisorThread_ = false;
141  std::unique_ptr<std::thread> readSupervisorThread_;
142  std::vector<std::thread*> workerThreads_;
143 
144  tbb::concurrent_queue<unsigned int> workerPool_;
145  std::vector<ReaderInfo> workerJob_;
146 
147  tbb::concurrent_queue<InputChunk*> freeChunks_;
148  tbb::concurrent_queue<InputFile*> fileQueue_;
149 
151  std::vector<std::condition_variable*> cvReader_;
152  std::vector<unsigned int> tid_active_;
153 
154  std::atomic<bool> quit_threads_;
155  std::vector<unsigned int> thread_quit_signal;
156  bool setExceptionState_ = false;
158  std::condition_variable startupCv_;
159 
160  int currentFileIndex_ = -1;
161  std::list<std::pair<int,InputFile*>> filesToDelete_;
162  std::list<std::pair<int,std::string>> fileNamesToDelete_;
164  std::vector<int> streamFileTracker_;
165  unsigned int nStreams_ = 0;
166  unsigned int checkEvery_ = 10;
167 
168  //supervisor thread wakeup
170  std::condition_variable cvWakeup_;
171 
172  //variables for the single buffered mode
174  int fileDescriptor_ = -1;
175  uint32_t bufferInputRead_ = 0;
176 
177  std::atomic<bool> threadInit_;
178 
179  std::map<unsigned int,unsigned int> sourceEventsReport_;
181 };
182 
183 
184 struct InputChunk {
185  unsigned char * buf_;
186  InputChunk *next_ = nullptr;
187  uint32_t size_;
188  uint32_t usedSize_ = 0;
189  unsigned int index_;
190  unsigned int offset_;
191  unsigned int fileIndex_;
192  std::atomic<bool> readComplete_;
193 
194  InputChunk(unsigned int index, uint32_t size): size_(size),index_(index) {
195  buf_ = new unsigned char[size_];
196  reset(0,0,0);
197  }
198  void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex) {
199  offset_=newOffset;
200  usedSize_=toRead;
201  fileIndex_=fileIndex;
202  readComplete_=false;
203  }
204 
205  ~InputChunk() {delete[] buf_;}
206 };
207 
208 
209 struct InputFile {
212  unsigned int lumi_;
214  uint32_t fileSize_;
215  uint32_t nChunks_;
216  int nEvents_;
217  unsigned int nProcessed_;
218 
219  tbb::concurrent_vector<InputChunk*> chunks_;
220 
221  uint32_t bufferPosition_ = 0;
222  uint32_t chunkPosition_ = 0;
223  unsigned int currentChunk_ = 0;
224 
226  uint32_t fileSize =0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent = nullptr):
227  parent_(parent),
228  status_(status),
229  lumi_(lumi),
230  fileName_(name),
231  fileSize_(fileSize),
232  nChunks_(nChunks),
233  nEvents_(nEvents),
234  nProcessed_(0)
235  {
236  for (unsigned int i=0;i<nChunks;i++)
237  chunks_.push_back(nullptr);
238  }
239 
240  InputFile(std::string & name):fileName_(name) {}
241 
242  bool waitForChunk(unsigned int chunkid) {
243  //some atomics to make sure everything is cache synchronized for the main thread
244  return chunks_[chunkid]!=nullptr && chunks_[chunkid]->readComplete_;
245  }
246  bool advance(unsigned char* & dataPosition, const size_t size);
247  void moveToPreviousChunk(const size_t size, const size_t offset);
248  void rewindChunk(const size_t size);
249 };
250 
251 
252 #endif // EventFilter_Utilities_FedRawDataInputSource_h
253 
size
Write out results.
unsigned int lumi_
std::vector< std::string > fileNames_
std::condition_variable cvWakeup_
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
static boost::mutex mutex
Definition: LHEProxy.cc:11
tbb::concurrent_queue< unsigned int > workerPool_
InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), uint32_t fileSize=0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent=0)
evf::EvFDaqDirector::FileStatus status_
unsigned int index_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
InputChunk(unsigned int index, uint32_t size)
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< int > streamFileTracker_
unsigned char * buf_
InputFile(std::string &name)
edm::ProcessHistoryID processHistoryID_
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
std::vector< std::thread * > workerThreads_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:348
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::list< std::pair< int, std::string > > fileNamesToDelete_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
UInt_t nEvents
Definition: hcalCalib.cc:42
void reset(double vett[256])
Definition: TPedValues.cc:11