CMS 3D CMS Logo

EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <filesystem>
13 #include <iomanip>
14 #include <list>
15 #include <map>
16 #include <mutex>
17 #include <sstream>
18 #include <string>
19 #include <vector>
20 
21 //system headers
22 #include <sys/stat.h>
23 #include <sys/file.h>
24 #include <fcntl.h>
25 #include <cerrno>
26 #include <cstring>
27 #include <cstdio>
28 
29 #include <oneapi/tbb/concurrent_hash_map.h>
30 #include <boost/asio.hpp>
31 
32 class SystemBounds;
33 class GlobalContext;
34 class StreamID;
35 
36 class InputFile;
37 struct InputChunk;
38 
39 namespace edm {
40  class PathsAndConsumesOfModulesBase;
41  class ProcessContext;
42 } // namespace edm
43 
44 namespace Json {
45  class Value;
46 }
47 
48 namespace jsoncollector {
49  class DataPointDefinition;
50 }
51 
52 namespace edm {
54 }
55 
56 namespace evf {
57 
59 
61 
63  public:
65 
68  void initRun();
69  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
72  void preBeginRun(edm::GlobalContext const& globalContext);
73  void postEndRun(edm::GlobalContext const& globalContext);
74  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
75  void overrideRunNumber(unsigned int run) { run_ = run; }
76  std::string const& runString() const { return run_string_; }
80  bool useFileBroker() const { return useFileBroker_; }
81 
83  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
84  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
85  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
86  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
87  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
88  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
89  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
91  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
92  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
97  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
99  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
100  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
101  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
102  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
103  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
104  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
105  std::string getEoRFilePath() const;
106  std::string getEoRFileName() const;
109  std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
111  void removeFile(std::string);
112 
113  FileStatus updateFuLock(unsigned int& ls,
114  std::string& nextFile,
115  uint32_t& fsize,
116  uint16_t& rawHeaderSize,
117  uint64_t& lockWaitTime,
118  bool& setExceptionState);
120  unsigned int getRunNumber() const { return run_; }
121  void lockInitLock();
122  void unlockInitLock();
123  void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
124  bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
125  unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
126  void lockFULocal();
127  void unlockFULocal();
128  void lockFULocal2();
129  void unlockFULocal2();
130  void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
131  void createLumiSectionFiles(const uint32_t lumiSection,
132  const uint32_t currentLumiSection,
133  bool doCreateBoLS,
134  bool doCreateEoLS);
135  static int parseFRDFileHeader(std::string const& rawSourcePath,
136  int& rawFd,
137  uint16_t& rawHeaderSize,
138  uint16_t& rawDataType,
139  uint32_t& lsFromHeader,
140  int32_t& eventsFromHeader,
141  int64_t& fileSizeFromHeader,
142  bool requireHeader,
143  bool retry,
144  bool closeFile);
145  bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
146  int grabNextJsonFromRaw(std::string const& rawSourcePath,
147  int& rawFd,
148  uint16_t& rawHeaderSize,
149  int64_t& fileSizeFromHeader,
150  bool& fileFound,
151  uint32_t serverLS,
152  bool closeFile,
153  bool requireHeader = true);
154  int grabNextJsonFile(std::string const& jsonSourcePath,
155  std::string const& rawSourcePath,
156  int64_t& fileSizeFromJson,
157  bool& fileFound);
158  int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
159 
160  EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
161  bool& serverState,
162  uint32_t& serverLS,
163  uint32_t& closedServerLS,
164  std::string& nextFileJson,
165  std::string& nextFileRaw,
166  bool& rawHeader,
167  int maxLS);
168 
169  FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
170  unsigned int& ls,
171  std::string& nextFile,
172  int& rawFd,
173  uint16_t& rawHeaderSize,
174  int32_t& serverEventsInNewFile_,
175  int64_t& fileSize,
176  uint64_t& thisLockWaitTimeUs,
177  bool requireHeader = true);
178  void createRunOpendirMaybe();
180  int readLastLSEntry(std::string const& file);
181  unsigned int getLumisectionToStart() const;
182  unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
183  void setDeleteTracking(std::mutex* fileDeleteLock,
184  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
185  fileDeleteLockPtr_ = fileDeleteLock;
186  filesToDeletePtr_ = filesToDelete;
187  }
188 
190  void checkMergeTypePSet(edm::ProcessContext const& pc);
193  static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
194  bool inputThrottled();
195  bool lumisectionDiscarded(unsigned int ls);
196  std::vector<std::string> const& getBUBaseDirs() const { return bu_base_dirs_all_; }
197 
198  private:
199  bool bumpFile(unsigned int& ls,
200  unsigned int& index,
201  std::string& nextFile,
202  uint32_t& fsize,
203  uint16_t& rawHeaderSize,
204  int maxLS,
205  bool& setExceptionState);
206  void openFULockfileStream(bool create);
207  static bool checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path);
208  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
209  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
210  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
212  std::string eolsFileName(const unsigned int ls) const;
213  std::string eorFileName() const;
214  int getNFilesFromEoLS(std::string BUEoLSFile);
215 
218  std::vector<std::string> bu_base_dirs_all_;
219  unsigned int run_;
226  unsigned int fuLockPollInterval_;
233 
234  unsigned int startFromLS_ = 1;
235 
244 
250 
256 
258 
259  unsigned long previousFileSize_;
260 
261  struct flock bu_w_flk;
262  struct flock bu_r_flk;
263  struct flock bu_w_fulk;
264  struct flock bu_r_fulk;
265  struct flock fu_rw_flk;
266  struct flock fu_rw_fulk;
267 
269 
271  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
272 
273  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
274 
275  unsigned int nStreams_ = 0;
276  unsigned int nThreads_ = 0;
277  unsigned int nConcurrentLumis_ = 0;
278 
279  bool readEolsDefinition_ = true;
280  unsigned int eolsNFilesIndex_ = 1;
283  unsigned int stop_ls_override_ = 0;
284 
285  std::shared_ptr<Json::Value> transferSystemJson_;
286  tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;
287 
288  //values initialized in .cc file
289  static const std::vector<std::string> MergeTypeNames_;
290 
291  //json parser
293 
294  boost::asio::io_service io_service_;
295  std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
296  std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
297  std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
298  std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
299 
302  };
303 } // namespace evf
304 
305 #endif
std::string & buBaseRunDir()
struct flock bu_w_fulk
unsigned int nThreads_
Definition: start.py:1
Definition: fillJson.h:27
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string const & runString() const
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
unsigned int getRunNumber() const
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
bool useFileBroker() const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
bool outputAdler32Recheck() const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
static std::mutex mutex
Definition: Proxy.cc:8
pthread_mutex_t init_lock_
struct flock bu_r_fulk
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
Represents a JSON value.
Definition: value.h:99
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
unsigned int numConcurrentLumis() const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
void overrideRunNumber(unsigned int run)
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string getRunOpenDirPath() const
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string eorFileName() const
std::string bu_base_dir_
std::string findCurrentRunDir()
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
JSON (JavaScript Object Notation).
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getEoRFileName() const
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43
std::string initFileName(std::string const &stream) const
HLT enums.
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::vector< std::string > const & getBUBaseDirs() const
struct flock bu_r_flk
unsigned int nStreams_
std::string bu_run_open_dir_
std::string eolsFileName(const unsigned int ls) const