CMS 3D CMS Logo

DAQSource.h
Go to the documentation of this file.
1 #ifndef EventFilter_Utilities_DAQSource_h
2 #define EventFilter_Utilities_DAQSource_h
3 
4 #include <condition_variable>
5 #include <cstdio>
6 #include <filesystem>
7 #include <memory>
8 #include <mutex>
9 #include <thread>
10 
11 #include "oneapi/tbb/concurrent_queue.h"
12 #include "oneapi/tbb/concurrent_vector.h"
13 
19 
21 
22 //import InputChunk
24 
26 class InputSourceDescription;
27 class ParameterSet;
28 
29 class RawInputFile;
30 class DataMode;
31 
32 class DataModeFRD;
33 
34 namespace evf {
36  namespace FastMonState {
37  enum InputState : short;
38  }
39 } // namespace evf
40 
42  friend class RawInputFile;
43  friend struct InputChunk;
44 
45 public:
47  ~DAQSource() override;
48  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
49 
50  std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
51  bool useL1EventID() const { return useL1EventID_; }
52  int currentLumiSection() const { return currentLumiSection_; }
53  int eventRunNumber() const { return eventRunNumber_; }
55  makeEvent(eventPrincipal, aux);
56  }
58 
60 
61 protected:
62  Next checkNext() override;
63  void read(edm::EventPrincipal& eventPrincipal) override;
66 
67 private:
68  void rewind_() override;
71 
72  void maybeOpenNewLumiSection(const uint32_t lumiSection);
73 
74  void readSupervisor();
75  void dataArranger();
76  void readWorker(unsigned int tid);
77  void threadError();
79 
80  //monitoring
81  void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
82 
83  long initFileList();
84  evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
85 
86  //variables
89 
91  uint64_t eventChunkSize_; // for buffered read-ahead
92  uint64_t maxChunkSize_; // for buffered read-ahead
93  uint64_t eventChunkBlock_; // how much read(2) asks at the time
94  unsigned int readBlocks_;
95  unsigned int numBuffers_;
96  unsigned int maxBufferedFiles_;
97  unsigned int numConcurrentReads_;
98  std::atomic<unsigned int> readingFilesCount_;
99 
100  // get LS from filename instead of event header
102  const bool verifyChecksum_;
103  const bool useL1EventID_;
104  const std::vector<unsigned int> testTCDSFEDRange_;
105  std::vector<std::string> listFileNames_;
107  //std::vector<std::string> fileNamesSorted_;
108 
109  const bool fileListMode_;
110  unsigned int fileListIndex_ = 0;
111  const bool fileListLoopMode_;
112  unsigned int loopModeIterationInc_ = 0;
113 
116 
118 
119  unsigned int currentLumiSection_;
120  uint32_t eventRunNumber_ = 0;
121  uint32_t GTPEventID_ = 0;
122  unsigned int eventsThisLumi_;
123  unsigned long eventsThisRun_ = 0;
124  std::default_random_engine rng_;
125 
126  /*
127  *
128  * Multithreaded file reader
129  *
130  **/
131 
132  typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
133 
134  std::unique_ptr<RawInputFile> currentFile_;
135  bool chunkIsFree_ = false;
136 
138  std::unique_ptr<std::thread> readSupervisorThread_;
139  std::unique_ptr<std::thread> dataArrangerThread_;
140  std::vector<std::thread*> workerThreads_;
141 
142  tbb::concurrent_queue<unsigned int> workerPool_;
143  std::vector<ReaderInfo> workerJob_;
144 
145  tbb::concurrent_queue<InputChunk*> freeChunks_;
146  tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
147 
149  std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
150  std::vector<unsigned int> tid_active_;
151 
152  std::atomic<bool> quit_threads_;
153  std::vector<unsigned int> thread_quit_signal;
154  bool setExceptionState_ = false;
156  std::condition_variable startupCv_;
157 
159  std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
161  std::vector<int> streamFileTracker_;
162  unsigned int checkEvery_ = 10;
163 
164  //supervisor thread wakeup
166  std::condition_variable cvWakeup_;
167 
168  //variables for the single buffered mode
169  int fileDescriptor_ = -1;
170 
171  std::atomic<bool> threadInit_;
172 
173  std::map<unsigned int, unsigned int> sourceEventsReport_;
175 
176  std::shared_ptr<DataMode> dataMode_;
177 };
178 
179 class RawInputFile : public InputFile {
180 public:
182  unsigned int lumi = 0,
183  std::string const& name = std::string(),
184  bool deleteFile = true,
185  int rawFd = -1,
186  uint64_t fileSize = 0,
187  uint16_t rawHeaderSize = 0,
188  uint32_t nChunks = 0,
189  int nEvents = 0,
190  DAQSource* parent = nullptr)
191  : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
193  bool advance(unsigned char*& dataPosition, const size_t size);
194  void advance(const size_t size) {
195  chunkPosition_ += size;
197  }
198 
199 private:
201 };
202 
203 #endif // EventFilter_Utilities_DAQSource_h
size
Write out results.
long initFileList()
Definition: DAQSource.cc:1402
Definition: fillJson.h:27
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
std::pair< RawInputFile *, InputChunk * > ReaderInfo
Definition: DAQSource.h:132
int currentFileIndex_
Definition: DAQSource.h:158
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
void threadError()
Definition: DAQSource.cc:1325
bool useFileBroker_
Definition: DAQSource.h:106
DAQSource * sourceParent_
Definition: DAQSource.h:200
RawInputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi=0, std::string const &name=std::string(), bool deleteFile=true, int rawFd=-1, uint64_t fileSize=0, uint16_t rawHeaderSize=0, uint32_t nChunks=0, int nEvents=0, DAQSource *parent=nullptr)
Definition: DAQSource.h:181
unsigned int checkEvery_
Definition: DAQSource.h:162
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
bool chunkIsFree_
Definition: DAQSource.h:135
std::condition_variable startupCv_
Definition: DAQSource.h:156
std::atomic< bool > threadInit_
Definition: DAQSource.h:171
unsigned int loopModeIterationInc_
Definition: DAQSource.h:112
Next checkNext() override
Definition: DAQSource.cc:254
static std::mutex mutex
Definition: Proxy.cc:8
void read(edm::EventPrincipal &eventPrincipal) override
Definition: DAQSource.cc:548
uint32_t GTPEventID_
Definition: DAQSource.h:121
const bool useL1EventID_
Definition: DAQSource.h:103
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1390
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: DAQSource.cc:327
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
const std::string dataModeConfig_
Definition: DAQSource.h:90
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
std::mutex mReader_
Definition: DAQSource.h:148
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1381
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
std::mutex mWakeup_
Definition: DAQSource.h:165
int fileDescriptor_
Definition: DAQSource.h:169
unsigned int currentLumiSection_
Definition: DAQSource.h:119
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:347
uint64_t eventChunkSize_
Definition: DAQSource.h:91
bool advance(unsigned char *&dataPosition, const size_t size)
Definition: DAQSource.cc:1340
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
Definition: DAQSource.cc:32
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
unsigned long eventsThisRun_
Definition: DAQSource.h:123
evf::EvFDaqDirector::FileStatus getNextDataBlock()
Definition: DAQSource.cc:373
std::string fuOutputDir_
Definition: DAQSource.h:115
int currentLumiSection() const
Definition: DAQSource.h:52
unsigned int fileListIndex_
Definition: DAQSource.h:110
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void readWorker(unsigned int tid)
Definition: DAQSource.cc:1064
unsigned int numBuffers_
Definition: DAQSource.h:95
std::unique_ptr< std::thread > dataArrangerThread_
Definition: DAQSource.h:139
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
bool useL1EventID() const
Definition: DAQSource.h:51
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
void makeEventWrapper(edm::EventPrincipal &eventPrincipal, edm::EventAuxiliary &aux)
Definition: DAQSource.h:54
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
edm::ProcessHistoryID & processHistoryID()
Definition: DAQSource.h:59
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
const bool fileListMode_
Definition: DAQSource.h:109
void advance(const size_t size)
Definition: DAQSource.h:194
const std::vector< unsigned int > testTCDSFEDRange_
Definition: DAQSource.h:104
uint32_t eventRunNumber_
Definition: DAQSource.h:120
unsigned long long uint64_t
Definition: Time.h:13
void dataArranger()
Definition: DAQSource.cc:591
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
~DAQSource() override
Definition: DAQSource.cc:185
bool fileListLoopMode()
Definition: DAQSource.h:57
uint32_t chunkPosition_
std::vector< int > streamFileTracker_
Definition: DAQSource.h:161
void rewind_() override
Definition: DAQSource.cc:589
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
unsigned int readBlocks_
Definition: DAQSource.h:94
ItemTypeInfo state() const
Definition: InputSource.h:361
void setMonStateSup(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1335
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
bool setExceptionState_
Definition: DAQSource.h:154
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DAQSource.cc:223
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
bool startedSupervisorThread_
Definition: DAQSource.h:137
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
unsigned int RunNumber_t
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
int eventRunNumber() const
Definition: DAQSource.h:53
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1436
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1330
bool exceptionState()
Definition: DAQSource.h:78
const bool verifyChecksum_
Definition: DAQSource.h:102
void readSupervisor()
Definition: DAQSource.cc:593
int events
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159
std::mutex monlock_
Definition: DAQSource.h:174
uint32_t bufferPosition_