CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
2 #define EventFilter_Utilities_FedRawDataInputSource_h
3 
4 #include <memory>
5 #include <stdio.h>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include "tbb/concurrent_queue.h"
10 #include "tbb/concurrent_vector.h"
11 
12 #include "boost/filesystem.hpp"
13 
22 
24 
26 
28 class InputSourceDescription;
29 class ParameterSet;
30 
31 class InputFile;
32 class InputChunk;
33 
34 namespace evf {
36 }
37 
38 namespace jsoncollector {
39 class DataPointDefinition;
40 }
41 
42 
44 
45 friend class InputFile;
46 friend class InputChunk;
47 
48 public:
50  virtual ~FedRawDataInputSource();
51 
52 protected:
53  virtual bool checkNextEvent() override;
54  virtual void read(edm::EventPrincipal& eventPrincipal) override;
55 
56 private:
57  virtual void preForkReleaseResources() override;
58  virtual void postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource>) override;
59  virtual void rewind_() override;
60 
61  void maybeOpenNewLumiSection(const uint32_t lumiSection);
64  edm::Timestamp fillFEDRawDataCollection(std::auto_ptr<FEDRawDataCollection>&);
65  void deleteFile(std::string const&);
67  void renameToNextFree(std::string const& fileName) const;
68 
69  void readSupervisor();
70  void readWorker(unsigned int tid);
71  void threadError();
73 
74  //functions for single buffered reader
76 
77  //variables
80 
82 
83  unsigned int eventChunkSize_; // for buffered read-ahead
84  unsigned int eventChunkBlock_; // how much read(2) asks at the time
85  unsigned int readBlocks_;
86  unsigned int numBuffers_;
87  unsigned int numConcurrentReads_;
88 
89  // get LS from filename instead of event header
90  const bool getLSFromFilename_;
91  const bool verifyAdler32_;
92  const bool useL1EventID_;
94 
96 
98 
100 
101  std::unique_ptr<FRDEventMsgView> event_;
102 
105 
106  unsigned int currentLumiSection_;
107  uint32_t eventRunNumber_=0;
108  uint32_t GTPEventID_ = 0;
109  uint32_t L1EventID_ = 0;
110  unsigned char *tcds_pointer_;
111  unsigned int eventsThisLumi_;
112  unsigned long eventsThisRun_ = 0;
113 
115 
116  /*
117  *
118  * Multithreaded file reader
119  *
120  **/
121 
122  typedef std::pair<InputFile*,InputChunk*> ReaderInfo;
123 
126  bool chunkIsFree_=false;
127 
129  std::unique_ptr<std::thread> readSupervisorThread_;
130  std::vector<std::thread*> workerThreads_;
131 
132  tbb::concurrent_queue<unsigned int> workerPool_;
133  std::vector<ReaderInfo> workerJob_;
134 
135  tbb::concurrent_queue<InputChunk*> freeChunks_;
136  tbb::concurrent_queue<InputFile*> fileQueue_;
137 
139  std::vector<std::condition_variable*> cvReader_;
140 
141  std::atomic<bool> quit_threads_;
142  std::vector<bool> thread_quit_signal;
143  bool setExceptionState_ = false;
145  std::condition_variable startupCv_;
146 
148  std::list<std::pair<int,InputFile*>> filesToDelete_;
149  std::list<std::pair<int,std::string>> fileNamesToDelete_;
151  std::vector<int> *streamFileTrackerPtr_ = nullptr;
152  unsigned int nStreams_ = 0;
153  unsigned int checkEvery_ = 10;
154 
155  //supervisor thread wakeup
157  std::condition_variable cvWakeup_;
158 
159  //variables for the single buffered mode
161  int fileDescriptor_ = -1;
162  uint32_t bufferInputRead_ = 0;
163 
164  std::atomic<bool> threadInit_;
165 
166 };
167 
168 
169 struct InputChunk {
170  unsigned char * buf_;
171  InputChunk *next_ = nullptr;
172  uint32_t size_;
173  uint32_t usedSize_ = 0;
174  unsigned int index_;
175  unsigned int offset_;
176  unsigned int fileIndex_;
177  std::atomic<bool> readComplete_;
178 
179  InputChunk(unsigned int index, uint32_t size): size_(size),index_(index) {
180  buf_ = new unsigned char[size_];
181  reset(0,0,0);
182  }
183  void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex) {
184  offset_=newOffset;
185  usedSize_=toRead;
186  fileIndex_=fileIndex;
187  readComplete_=false;
188  }
189 
190  ~InputChunk() {delete[] buf_;}
191 };
192 
193 
194 struct InputFile {
197  unsigned int lumi_;
199  uint32_t fileSize_;
200  uint32_t nChunks_;
201  unsigned int nEvents_;
202  unsigned int nProcessed_;
203 
204  tbb::concurrent_vector<InputChunk*> chunks_;
205 
206  uint32_t bufferPosition_ = 0;
207  uint32_t chunkPosition_ = 0;
208  unsigned int currentChunk_ = 0;
209 
211  uint32_t fileSize =0, uint32_t nChunks=0, unsigned int nEvents=0, FedRawDataInputSource *parent = nullptr):
212  parent_(parent),
213  status_(status),
214  lumi_(lumi),
215  fileName_(name),
216  fileSize_(fileSize),
217  nChunks_(nChunks),
218  nEvents_(nEvents),
219  nProcessed_(0)
220  {
221  for (unsigned int i=0;i<nChunks;i++)
222  chunks_.push_back(nullptr);
223  }
224 
226 
227  bool waitForChunk(unsigned int chunkid) {
228  //some atomics to make sure everything is cache synchronized for the main thread
229  return chunks_[chunkid]!=nullptr && chunks_[chunkid]->readComplete_;
230  }
231  bool advance(unsigned char* & dataPosition, const size_t size);
232  void moveToPreviousChunk(const size_t size, const size_t offset);
233  void rewindChunk(const size_t size);
234 };
235 
236 
237 #endif // EventFilter_Utilities_FedRawDataInputSource_h
238 
InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), uint32_t fileSize=0, uint32_t nChunks=0, unsigned int nEvents=0, FedRawDataInputSource *parent=0)
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
std::condition_variable cvWakeup_
virtual void read(edm::EventPrincipal &eventPrincipal) override
list parent
Definition: dbtoconf.py:74
virtual void rewind_() override
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
static boost::mutex mutex
Definition: LHEProxy.cc:11
tbb::concurrent_queue< unsigned int > workerPool_
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tuple lumi
Definition: fjr2json.py:35
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
unsigned int index_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
InputChunk(unsigned int index, uint32_t size)
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
uint32_t bufferPosition_
tuple path
else: Piece not in the list, fine.
const edm::RunNumber_t runNumber_
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
int grabNextJsonFile(boost::filesystem::path const &)
InputFile(std::string &name)
edm::ProcessHistoryID processHistoryID_
virtual void preForkReleaseResources() override
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned int offset(bool)
std::vector< std::thread * > workerThreads_
unsigned int uint32
Definition: MsgTools.h:13
const std::string fuOutputDir_
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &)
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
virtual void postForkReacquireResources(boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::list< std::pair< int, std::string > > fileNamesToDelete_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void readWorker(unsigned int tid)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
unsigned int RunNumber_t
Definition: EventRange.h:32
InputChunk * next_
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
UInt_t nEvents
Definition: hcalCalib.cc:42
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
tuple size
Write out results.
void renameToNextFree(std::string const &fileName) const