CMS 3D CMS Logo

EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <filesystem>
13 #include <iomanip>
14 #include <list>
15 #include <map>
16 #include <mutex>
17 #include <sstream>
18 #include <string>
19 #include <vector>
20 
21 //system headers
22 #include <sys/stat.h>
23 #include <sys/file.h>
24 #include <fcntl.h>
25 #include <cerrno>
26 #include <cstring>
27 #include <cstdio>
28 
29 #include <boost/asio.hpp>
30 
31 class SystemBounds;
32 class GlobalContext;
33 class StreamID;
34 
35 class InputFile;
36 struct InputChunk;
37 
38 namespace edm {
39  class PathsAndConsumesOfModulesBase;
40  class ProcessContext;
41 } // namespace edm
42 
43 namespace jsoncollector {
44  class DataPointDefinition;
45 
46  namespace Json {
47  class Value;
48  }
49 } // namespace jsoncollector
50 
51 namespace edm {
53 }
54 
55 namespace evf {
56 
58 
60 
62  public:
64 
67  void initRun();
68  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
70  void preBeginRun(edm::GlobalContext const& globalContext);
71  void postEndRun(edm::GlobalContext const& globalContext);
72  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
73  void overrideRunNumber(unsigned int run) { run_ = run; }
74  std::string const& runString() const { return run_string_; }
78  bool useFileBroker() const { return useFileBroker_; }
79 
81  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
82  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
83  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
84  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
85  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
86  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
87  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
88  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
89  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
95  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
97  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
98  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
99  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
100  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
101  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
102  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
103  std::string getEoRFilePath() const;
104  std::string getEoRFileName() const;
107  std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
109  void removeFile(std::string);
110 
111  FileStatus updateFuLock(unsigned int& ls,
112  std::string& nextFile,
113  uint32_t& fsize,
114  uint16_t& rawHeaderSize,
115  uint64_t& lockWaitTime,
116  bool& setExceptionState);
118  unsigned int getRunNumber() const { return run_; }
119  void lockInitLock();
120  void unlockInitLock();
121  void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
122  bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
123  unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
124  void lockFULocal();
125  void unlockFULocal();
126  void lockFULocal2();
127  void unlockFULocal2();
128  void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
129  void createLumiSectionFiles(const uint32_t lumiSection,
130  const uint32_t currentLumiSection,
131  bool doCreateBoLS,
132  bool doCreateEoLS);
133  static int parseFRDFileHeader(std::string const& rawSourcePath,
134  int& rawFd,
135  uint16_t& rawHeaderSize,
136  uint16_t& rawDataType,
137  uint32_t& lsFromHeader,
138  int32_t& eventsFromHeader,
139  int64_t& fileSizeFromHeader,
140  bool requireHeader,
141  bool retry,
142  bool closeFile);
143  bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
144  int grabNextJsonFromRaw(std::string const& rawSourcePath,
145  int& rawFd,
146  uint16_t& rawHeaderSize,
147  int64_t& fileSizeFromHeader,
148  bool& fileFound,
149  uint32_t serverLS,
150  bool closeFile,
151  bool requireHeader = true);
152  int grabNextJsonFile(std::string const& jsonSourcePath,
153  std::string const& rawSourcePath,
154  int64_t& fileSizeFromJson,
155  bool& fileFound);
156  int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
157 
158  EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
159  bool& serverState,
160  uint32_t& serverLS,
161  uint32_t& closedServerLS,
162  std::string& nextFileJson,
163  std::string& nextFileRaw,
164  bool& rawHeader,
165  int maxLS);
166 
167  FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
168  unsigned int& ls,
169  std::string& nextFile,
170  int& rawFd,
171  uint16_t& rawHeaderSize,
172  int32_t& serverEventsInNewFile_,
173  int64_t& fileSize,
174  uint64_t& thisLockWaitTimeUs,
175  bool requireHeader = true);
176  void createRunOpendirMaybe();
178  int readLastLSEntry(std::string const& file);
179  unsigned int getLumisectionToStart() const;
180  unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
181  void setDeleteTracking(std::mutex* fileDeleteLock,
182  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
183  fileDeleteLockPtr_ = fileDeleteLock;
184  filesToDeletePtr_ = filesToDelete;
185  }
186 
189  return MergeTypeNames_[defaultType];
190  }
191  static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
192  bool inputThrottled();
193  bool lumisectionDiscarded(unsigned int ls);
194  std::vector<std::string> const& getBUBaseDirs() const { return bu_base_dirs_all_; }
195  std::vector<int> const& getBUBaseDirsNSources() const { return bu_base_dirs_nSources_; }
196 
197  private:
198  bool bumpFile(unsigned int& ls,
199  unsigned int& index,
200  std::string& nextFile,
201  uint32_t& fsize,
202  uint16_t& rawHeaderSize,
203  int maxLS,
204  bool& setExceptionState);
205  void openFULockfileStream(bool create);
206  static bool checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path);
207  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
208  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
209  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
211  std::string eolsFileName(const unsigned int ls) const;
212  std::string eorFileName() const;
213  int getNFilesFromEoLS(std::string BUEoLSFile);
214 
217  std::vector<std::string> bu_base_dirs_all_;
218  std::vector<int> bu_base_dirs_nSources_;
219  unsigned int run_;
226  unsigned int fuLockPollInterval_;
230 
231  unsigned int startFromLS_ = 1;
232 
241 
247 
253 
255 
256  unsigned long previousFileSize_;
257 
258  struct flock bu_w_flk;
259  struct flock bu_r_flk;
260  struct flock bu_w_fulk;
261  struct flock bu_r_fulk;
262  struct flock fu_rw_flk;
263  struct flock fu_rw_fulk;
264 
266 
268  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
269 
270  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
271 
272  unsigned int nStreams_ = 0;
273  unsigned int nThreads_ = 0;
274  unsigned int nConcurrentLumis_ = 0;
275 
276  bool readEolsDefinition_ = true;
277  unsigned int eolsNFilesIndex_ = 1;
280  unsigned int stop_ls_override_ = 0;
281 
282  //values initialized in .cc file
283  static const std::vector<std::string> MergeTypeNames_;
284 
285  //json parser
287 
288  boost::asio::io_service io_service_;
289  std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
290  std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
291  std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
292  std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
293 
296  };
297 } // namespace evf
298 
299 #endif
std::string & buBaseRunDir()
struct flock bu_w_fulk
unsigned int nThreads_
Definition: start.py:1
Definition: fillJson.h:27
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string const & runString() const
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
unsigned int getRunNumber() const
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
bool useFileBroker() const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
bool outputAdler32Recheck() const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
static std::mutex mutex
Definition: Proxy.cc:8
pthread_mutex_t init_lock_
struct flock bu_r_fulk
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
unsigned int numConcurrentLumis() const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
void overrideRunNumber(unsigned int run)
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string getRunOpenDirPath() const
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string eorFileName() const
std::string bu_base_dir_
std::string findCurrentRunDir()
std::string getStreamMergeType(std::string const &, MergeType defaultType) const
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getEoRFileName() const
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::vector< int > const & getBUBaseDirsNSources() const
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43
std::string initFileName(std::string const &stream) const
HLT enums.
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
std::string getStreamDestinations(std::string const &) const
void preallocate(edm::service::SystemBounds const &bounds)
std::vector< int > bu_base_dirs_nSources_
Represents a JSON value.
Definition: value.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
JSON (JavaScript Object Notation).
Definition: DataPoint.h:26
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::vector< std::string > const & getBUBaseDirs() const
struct flock bu_r_flk
unsigned int nStreams_
std::string bu_run_open_dir_
std::string eolsFileName(const unsigned int ls) const