CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
2 #define EventFilter_Utilities_FedRawDataInputSource_h
3 
4 #include <memory>
5 #include <stdio.h>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include "tbb/concurrent_queue.h"
10 #include "tbb/concurrent_vector.h"
11 
12 #include "boost/filesystem.hpp"
13 
22 
24 
26 
28 class InputSourceDescription;
29 class ParameterSet;
30 
31 class InputFile;
32 class InputChunk;
33 
34 namespace evf {
35 class FastMonitoringService;
36 }
37 
38 namespace jsoncollector {
39 class DataPointDefinition;
40 }
41 
42 
44 
45 friend class InputFile;
46 friend class InputChunk;
47 
48 public:
50  virtual ~FedRawDataInputSource();
51  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
52 
53  std::pair<bool,unsigned int> getEventReport(unsigned int lumi, bool erase);
54 protected:
55  virtual bool checkNextEvent() override;
56  virtual void read(edm::EventPrincipal& eventPrincipal) override;
57 
58 private:
59  virtual void preForkReleaseResources() override;
60  virtual void postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>) override;
61  virtual void rewind_() override;
62 
63  void maybeOpenNewLumiSection(const uint32_t lumiSection);
64  void createBoLSFile(const uint32_t lumiSection,bool checkIfExists);
68  void deleteFile(std::string const&);
70 
71  void readSupervisor();
72  void readWorker(unsigned int tid);
73  void threadError();
75 
76  //functions for single buffered reader
78 
79  //monitoring
80  void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);
81 
82  long initFileList();
83  evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
84 
85  //variables
88 
90 
91  unsigned int eventChunkSize_; // for buffered read-ahead
92  unsigned int eventChunkBlock_; // how much read(2) asks at the time
93  unsigned int readBlocks_;
94  unsigned int numBuffers_;
95  unsigned int maxBufferedFiles_;
96  unsigned int numConcurrentReads_;
97  std::atomic<unsigned int> readingFilesCount_;
98 
99  // get LS from filename instead of event header
100  const bool getLSFromFilename_;
101  const bool verifyAdler32_;
102  const bool verifyChecksum_;
103  const bool useL1EventID_;
104  std::vector<std::string> fileNames_;
105  //std::vector<std::string> fileNamesSorted_;
106 
107  const bool fileListMode_;
108  unsigned int fileListIndex_ = 0;
109  const bool fileListLoopMode_;
110  unsigned int loopModeIterationInc_ = 0;
111 
114 
116 
117  std::unique_ptr<FRDEventMsgView> event_;
118 
121 
122  unsigned int currentLumiSection_;
123  uint32_t eventRunNumber_=0;
124  uint32_t GTPEventID_ = 0;
125  uint32_t L1EventID_ = 0;
126  unsigned char *tcds_pointer_;
127  unsigned int eventsThisLumi_;
128  unsigned long eventsThisRun_ = 0;
129 
131 
132  /*
133  *
134  * Multithreaded file reader
135  *
136  **/
137 
138  typedef std::pair<InputFile*,InputChunk*> ReaderInfo;
139 
142  bool chunkIsFree_=false;
143 
145  std::unique_ptr<std::thread> readSupervisorThread_;
146  std::vector<std::thread*> workerThreads_;
147 
148  tbb::concurrent_queue<unsigned int> workerPool_;
149  std::vector<ReaderInfo> workerJob_;
150 
151  tbb::concurrent_queue<InputChunk*> freeChunks_;
152  tbb::concurrent_queue<InputFile*> fileQueue_;
153 
155  std::vector<std::condition_variable*> cvReader_;
156  std::vector<unsigned int> tid_active_;
157 
158  std::atomic<bool> quit_threads_;
159  std::vector<unsigned int> thread_quit_signal;
160  bool setExceptionState_ = false;
162  std::condition_variable startupCv_;
163 
165  std::list<std::pair<int,InputFile*>> filesToDelete_;
166  std::list<std::pair<int,std::string>> fileNamesToDelete_;
168  std::vector<int> *streamFileTrackerPtr_ = nullptr;
169  unsigned int nStreams_ = 0;
170  unsigned int checkEvery_ = 10;
171 
172  //supervisor thread wakeup
174  std::condition_variable cvWakeup_;
175 
176  //variables for the single buffered mode
178  int fileDescriptor_ = -1;
179  uint32_t bufferInputRead_ = 0;
180 
181  std::atomic<bool> threadInit_;
182 
183  std::map<unsigned int,unsigned int> sourceEventsReport_;
185 };
186 
187 
188 struct InputChunk {
189  unsigned char * buf_;
190  InputChunk *next_ = nullptr;
191  uint32_t size_;
192  uint32_t usedSize_ = 0;
193  unsigned int index_;
194  unsigned int offset_;
195  unsigned int fileIndex_;
196  std::atomic<bool> readComplete_;
197 
198  InputChunk(unsigned int index, uint32_t size): size_(size),index_(index) {
199  buf_ = new unsigned char[size_];
200  reset(0,0,0);
201  }
202  void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex) {
203  offset_=newOffset;
204  usedSize_=toRead;
205  fileIndex_=fileIndex;
206  readComplete_=false;
207  }
208 
209  ~InputChunk() {delete[] buf_;}
210 };
211 
212 
213 struct InputFile {
216  unsigned int lumi_;
218  uint32_t fileSize_;
219  uint32_t nChunks_;
220  int nEvents_;
221  unsigned int nProcessed_;
222 
223  tbb::concurrent_vector<InputChunk*> chunks_;
224 
225  uint32_t bufferPosition_ = 0;
226  uint32_t chunkPosition_ = 0;
227  unsigned int currentChunk_ = 0;
228 
230  uint32_t fileSize =0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent = nullptr):
231  parent_(parent),
232  status_(status),
233  lumi_(lumi),
234  fileName_(name),
235  fileSize_(fileSize),
236  nChunks_(nChunks),
237  nEvents_(nEvents),
238  nProcessed_(0)
239  {
240  for (unsigned int i=0;i<nChunks;i++)
241  chunks_.push_back(nullptr);
242  }
243 
245 
246  bool waitForChunk(unsigned int chunkid) {
247  //some atomics to make sure everything is cache synchronized for the main thread
248  return chunks_[chunkid]!=nullptr && chunks_[chunkid]->readComplete_;
249  }
250  bool advance(unsigned char* & dataPosition, const size_t size);
251  void moveToPreviousChunk(const size_t size, const size_t offset);
252  void rewindChunk(const size_t size);
253 };
254 
255 
256 #endif // EventFilter_Utilities_FedRawDataInputSource_h
257 
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > fileNames_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
virtual void read(edm::EventPrincipal &eventPrincipal) override
virtual void rewind_() override
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static boost::mutex mutex
Definition: LHEProxy.cc:11
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
void maybeOpenNewLumiSection(const uint32_t lumiSection)
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
tuple lumi
Definition: fjr2json.py:35
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
def ls
Definition: eostools.py:348
InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), uint32_t fileSize=0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent=0)
evf::EvFDaqDirector::FileStatus status_
unsigned int index_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
InputChunk(unsigned int index, uint32_t size)
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
uint32_t bufferPosition_
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
int grabNextJsonFile(boost::filesystem::path const &)
InputFile(std::string &name)
edm::ProcessHistoryID processHistoryID_
virtual void preForkReleaseResources() override
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
std::vector< std::thread * > workerThreads_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::list< std::pair< int, std::string > > fileNamesToDelete_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
tuple events
Definition: patZpeak.py:19
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
InputChunk * next_
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
UInt_t nEvents
Definition: hcalCalib.cc:42
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
tuple size
Write out results.
tuple status
Definition: mps_update.py:57