CMS 3D CMS Logo

EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <filesystem>
13 #include <iomanip>
14 #include <list>
15 #include <map>
16 #include <mutex>
17 #include <sstream>
18 #include <string>
19 #include <vector>
20 
21 //system headers
22 #include <sys/stat.h>
23 #include <sys/file.h>
24 #include <fcntl.h>
25 #include <cerrno>
26 #include <cstring>
27 #include <cstdio>
28 
29 #include <oneapi/tbb/concurrent_hash_map.h>
30 #include <boost/asio.hpp>
31 
32 class SystemBounds;
33 class GlobalContext;
34 class StreamID;
35 
36 struct InputFile;
37 struct InputChunk;
38 
39 namespace edm {
40  class PathsAndConsumesOfModulesBase;
41  class ProcessContext;
42 } // namespace edm
43 
44 namespace Json {
45  class Value;
46 }
47 
48 namespace jsoncollector {
49  class DataPointDefinition;
50 }
51 
52 namespace edm {
54 }
55 
56 namespace evf {
57 
59 
61 
63  public:
65 
68  void initRun();
69  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
72  void preBeginRun(edm::GlobalContext const& globalContext);
73  void postEndRun(edm::GlobalContext const& globalContext);
74  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
75  void overrideRunNumber(unsigned int run) { run_ = run; }
79  bool useFileBroker() const { return useFileBroker_; }
80 
82  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
83  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
84  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
85  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
86  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
87  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
88  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
89  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
91  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
96  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
98  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
99  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
100  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
101  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
102  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
103  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
104  std::string getEoRFilePath() const;
107  std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
109  void removeFile(unsigned int ls, unsigned int index);
110  void removeFile(std::string);
111 
112  FileStatus updateFuLock(unsigned int& ls,
113  std::string& nextFile,
114  uint32_t& fsize,
115  uint16_t& rawHeaderSize,
116  uint64_t& lockWaitTime,
117  bool& setExceptionState);
119  unsigned int getRunNumber() const { return run_; }
120  void lockInitLock();
121  void unlockInitLock();
122  void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
123  bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
124  unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
125  void lockFULocal();
126  void unlockFULocal();
127  void lockFULocal2();
128  void unlockFULocal2();
129  void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
130  void createLumiSectionFiles(const uint32_t lumiSection,
131  const uint32_t currentLumiSection,
132  bool doCreateBoLS,
133  bool doCreateEoLS);
134  static int parseFRDFileHeader(std::string const& rawSourcePath,
135  int& rawFd,
136  uint16_t& rawHeaderSize,
137  uint32_t& lsFromHeader,
138  int32_t& eventsFromHeader,
139  int64_t& fileSizeFromHeader,
140  bool requireHeader,
141  bool retry,
142  bool closeFile);
143  bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
144  int grabNextJsonFromRaw(std::string const& rawSourcePath,
145  int& rawFd,
146  uint16_t& rawHeaderSize,
147  int64_t& fileSizeFromHeader,
148  bool& fileFound,
149  uint32_t serverLS,
150  bool closeFile);
151  int grabNextJsonFile(std::string const& jsonSourcePath,
152  std::string const& rawSourcePath,
153  int64_t& fileSizeFromJson,
154  bool& fileFound);
155  int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
156 
157  EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
158  bool& serverState,
159  uint32_t& serverLS,
160  uint32_t& closedServerLS,
161  std::string& nextFileJson,
162  std::string& nextFileRaw,
163  bool& rawHeader,
164  int maxLS);
165 
166  FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
167  unsigned int& ls,
168  std::string& nextFile,
169  int& rawFd,
170  uint16_t& rawHeaderSize,
171  int32_t& serverEventsInNewFile_,
172  int64_t& fileSize,
173  uint64_t& thisLockWaitTimeUs);
174  void createRunOpendirMaybe();
176  int readLastLSEntry(std::string const& file);
177  unsigned int getLumisectionToStart() const;
178  unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
179  void setDeleteTracking(std::mutex* fileDeleteLock,
180  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
181  fileDeleteLockPtr_ = fileDeleteLock;
182  filesToDeletePtr_ = filesToDelete;
183  }
185  void checkMergeTypePSet(edm::ProcessContext const& pc);
188  static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
189  bool inputThrottled();
190  bool lumisectionDiscarded(unsigned int ls);
191 
192  private:
193  bool bumpFile(unsigned int& ls,
194  unsigned int& index,
195  std::string& nextFile,
196  uint32_t& fsize,
197  uint16_t& rawHeaderSize,
198  int maxLS,
199  bool& setExceptionState);
200  void openFULockfileStream(bool create);
201  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
202  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
203  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
205  std::string eolsFileName(const unsigned int ls) const;
206  std::string eorFileName() const;
207  int getNFilesFromEoLS(std::string BUEoLSFile);
208 
211  unsigned int run_;
218  unsigned int fuLockPollInterval_;
225 
226  unsigned int startFromLS_ = 1;
227 
236 
242 
248 
250 
251  unsigned long previousFileSize_;
252 
253  struct flock bu_w_flk;
254  struct flock bu_r_flk;
255  struct flock bu_w_fulk;
256  struct flock bu_r_fulk;
257  struct flock fu_rw_flk;
258  struct flock fu_rw_fulk;
259 
261 
263  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
264 
265  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
266 
267  unsigned int nStreams_ = 0;
268  unsigned int nThreads_ = 0;
269  unsigned int nConcurrentLumis_ = 0;
270 
271  bool readEolsDefinition_ = true;
272  unsigned int eolsNFilesIndex_ = 1;
275  unsigned int stop_ls_override_ = 0;
276 
277  std::shared_ptr<Json::Value> transferSystemJson_;
278  tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;
279 
280  //values initialized in .cc file
281  static const std::vector<std::string> MergeTypeNames_;
282 
283  //json parser
285 
286  boost::asio::io_service io_service_;
287  std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
288  std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
289  std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
290  std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
291 
294  };
295 } // namespace evf
296 
297 #endif
std::string & buBaseRunDir()
struct flock bu_w_fulk
unsigned int nThreads_
Definition: start.py:1
Definition: fillJson.h:27
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string run_string_
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
unsigned int getRunNumber() const
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
jsoncollector::DataPointDefinition * dpd_
bool useFileBroker() const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
bool outputAdler32Recheck() const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
static std::mutex mutex
Definition: Proxy.cc:8
pthread_mutex_t init_lock_
struct flock bu_r_fulk
std::string getEoLSFilePathOnFU(const unsigned int ls) const
bool lumisectionDiscarded(unsigned int ls)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Represents a JSON value.
Definition: value.h:99
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
unsigned int numConcurrentLumis() const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
void overrideRunNumber(unsigned int run)
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
std::string getRunOpenDirPath() const
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string eorFileName() const
std::string bu_base_dir_
std::string findCurrentRunDir()
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
JSON (JavaScript Object Notation).
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
unsigned int getStartLumisectionFromEnv() const
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43
std::string initFileName(std::string const &stream) const
HLT enums.
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk
unsigned int nStreams_
std::string bu_run_open_dir_
std::string eolsFileName(const unsigned int ls) const