CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <filesystem>
13 #include <iomanip>
14 #include <list>
15 #include <map>
16 #include <mutex>
17 #include <sstream>
18 #include <string>
19 #include <vector>
20 
21 //system headers
22 #include <sys/stat.h>
23 #include <sys/file.h>
24 #include <fcntl.h>
25 #include <cerrno>
26 #include <cstring>
27 #include <cstdio>
28 
29 #include <oneapi/tbb/concurrent_hash_map.h>
30 #include <boost/asio.hpp>
31 
32 class SystemBounds;
33 class GlobalContext;
34 class StreamID;
35 
36 struct InputFile;
37 struct InputChunk;
38 
39 namespace edm {
40  class PathsAndConsumesOfModulesBase;
41  class ProcessContext;
42 } // namespace edm
43 
44 namespace Json {
45  class Value;
46 }
47 
48 namespace jsoncollector {
49  class DataPointDefinition;
50 }
51 
52 namespace edm {
54 }
55 
56 namespace evf {
57 
59 
60  class FastMonitoringService;
61 
63  public:
65 
68  void initRun();
69  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
70  void preallocate(edm::service::SystemBounds const& bounds);
72  void preBeginRun(edm::GlobalContext const& globalContext);
73  void postEndRun(edm::GlobalContext const& globalContext);
74  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
75  void overrideRunNumber(unsigned int run) { run_ = run; }
79  bool useFileBroker() const { return useFileBroker_; }
80 
82  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
83  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
84  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
85  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
86  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
87  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
88  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
89  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
91  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
95  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
97  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
98  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
99  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
100  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
101  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
102  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
103  std::string getEoRFilePath() const;
106  std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
108  void removeFile(unsigned int ls, unsigned int index);
109  void removeFile(std::string);
110 
111  FileStatus updateFuLock(unsigned int& ls,
112  std::string& nextFile,
113  uint32_t& fsize,
114  uint16_t& rawHeaderSize,
115  uint64_t& lockWaitTime,
116  bool& setExceptionState);
118  unsigned int getRunNumber() const { return run_; }
119  void lockInitLock();
120  void unlockInitLock();
121  void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
122  bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
123  void lockFULocal();
124  void unlockFULocal();
125  void lockFULocal2();
126  void unlockFULocal2();
127  void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
128  void createLumiSectionFiles(const uint32_t lumiSection,
129  const uint32_t currentLumiSection,
130  bool doCreateBoLS,
131  bool doCreateEoLS);
132  static int parseFRDFileHeader(std::string const& rawSourcePath,
133  int& rawFd,
134  uint16_t& rawHeaderSize,
135  uint32_t& lsFromHeader,
136  int32_t& eventsFromHeader,
137  int64_t& fileSizeFromHeader,
138  bool requireHeader,
139  bool retry,
140  bool closeFile);
141  bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
142  int grabNextJsonFromRaw(std::string const& rawSourcePath,
143  int& rawFd,
144  uint16_t& rawHeaderSize,
145  int64_t& fileSizeFromHeader,
146  bool& fileFound,
147  uint32_t serverLS,
148  bool closeFile);
149  int grabNextJsonFile(std::string const& jsonSourcePath,
150  std::string const& rawSourcePath,
151  int64_t& fileSizeFromJson,
152  bool& fileFound);
153  int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
154 
155  EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
156  bool& serverState,
157  uint32_t& serverLS,
158  uint32_t& closedServerLS,
159  std::string& nextFileJson,
160  std::string& nextFileRaw,
161  bool& rawHeader,
162  int maxLS);
163 
164  FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
165  unsigned int& ls,
166  std::string& nextFile,
167  int& rawFd,
168  uint16_t& rawHeaderSize,
169  int32_t& serverEventsInNewFile_,
170  int64_t& fileSize,
171  uint64_t& thisLockWaitTimeUs);
172  void createRunOpendirMaybe();
174  int readLastLSEntry(std::string const& file);
175  unsigned int getLumisectionToStart() const;
176  unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
177  void setDeleteTracking(std::mutex* fileDeleteLock,
178  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
179  fileDeleteLockPtr_ = fileDeleteLock;
180  filesToDeletePtr_ = filesToDelete;
181  }
183  void checkMergeTypePSet(edm::ProcessContext const& pc);
186  static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
187  bool inputThrottled();
188 
189  private:
190  bool bumpFile(unsigned int& ls,
191  unsigned int& index,
192  std::string& nextFile,
193  uint32_t& fsize,
194  uint16_t& rawHeaderSize,
195  int maxLS,
196  bool& setExceptionState);
197  void openFULockfileStream(bool create);
198  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
199  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
200  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
202  std::string eolsFileName(const unsigned int ls) const;
203  std::string eorFileName() const;
204  int getNFilesFromEoLS(std::string BUEoLSFile);
205 
208  unsigned int run_;
215  unsigned int fuLockPollInterval_;
222 
223  unsigned int startFromLS_ = 1;
224 
233 
239 
245 
247 
248  unsigned long previousFileSize_;
249 
250  struct flock bu_w_flk;
251  struct flock bu_r_flk;
252  struct flock bu_w_fulk;
253  struct flock bu_r_fulk;
254  struct flock fu_rw_flk;
255  struct flock fu_rw_fulk;
256 
258 
260  std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
261 
262  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
263 
264  unsigned int nStreams_ = 0;
265  unsigned int nThreads_ = 0;
266 
267  bool readEolsDefinition_ = true;
268  unsigned int eolsNFilesIndex_ = 1;
271  unsigned int stop_ls_override_ = 0;
272 
273  std::shared_ptr<Json::Value> transferSystemJson_;
274  tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;
275 
276  //values initialized in .cc file
277  static const std::vector<std::string> MergeTypeNames_;
278 
279  //json parser
281 
282  boost::asio::io_service io_service_;
283  std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
284  std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
285  std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
286  std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
287 
289  };
290 } // namespace evf
291 
292 #endif
std::string & buBaseRunDir()
struct flock bu_w_fulk
unsigned int nThreads_
std::string getStreamDestinations(std::string const &stream) const
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string run_string_
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
std::string fulockfile_
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
jsoncollector::DataPointDefinition * dpd_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
static std::mutex mutex
Definition: Proxy.cc:8
pthread_mutex_t init_lock_
struct flock bu_r_fulk
def ls
Definition: eostools.py:349
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
void createProcessingNotificationMaybe() const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
std::string eolsFileName(const unsigned int ls) const
Represents a JSON value.
Definition: value.h:99
std::string getEoLSFilePathOnBU(const unsigned int ls) const
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getEoRFilePath() const
unsigned long previousFileSize_
bool useFileBroker() const
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void overrideRunNumber(unsigned int run)
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::mutex * fileDeleteLockPtr_
std::string mergeTypePset_
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool outputAdler32Recheck() const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string stopFilePath_
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string bu_base_dir_
std::string findCurrentRunDir()
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
int readLastLSEntry(std::string const &file)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void openFULockfileStream(bool create)
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::string eorFileName() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43
void postEndRun(edm::GlobalContext const &globalContext)
static const std::vector< std::string > MergeTypeNames_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
unsigned int getStartLumisectionFromEnv() const
unsigned int getRunNumber() const
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string getRunOpenDirPath() const
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
std::string getFFFParamsFilePathOnBU() const
struct flock bu_r_flk
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
unsigned int nStreams_
std::string getEoRFilePathOnFU() const
std::string bu_run_open_dir_
std::string initFileName(std::string const &stream) const