1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
29 #include <oneapi/tbb/concurrent_hash_map.h>
30 #include <boost/asio.hpp>
40 class PathsAndConsumesOfModulesBase;
48 namespace jsoncollector {
49 class DataPointDefinition;
60 class FastMonitoringService;
114 uint16_t& rawHeaderSize,
116 bool& setExceptionState);
127 void createBoLSFile(
const uint32_t lumiSection,
bool checkIfExists)
const;
129 const uint32_t currentLumiSection,
134 uint16_t& rawHeaderSize,
135 uint32_t& lsFromHeader,
136 int32_t& eventsFromHeader,
137 int64_t& fileSizeFromHeader,
144 uint16_t& rawHeaderSize,
145 int64_t& fileSizeFromHeader,
151 int64_t& fileSizeFromJson,
158 uint32_t& closedServerLS,
168 uint16_t& rawHeaderSize,
169 int32_t& serverEventsInNewFile_,
178 std::list<std::pair<
int, std::unique_ptr<InputFile>>>* filesToDelete) {
186 static struct flock
make_flock(short
type, short whence, off_t
start, off_t len, pid_t pid);
194 uint16_t& rawHeaderSize,
196 bool& setExceptionState);
283 std::unique_ptr<boost::asio::ip::tcp::resolver>
resolver_;
284 std::unique_ptr<boost::asio::ip::tcp::resolver::query>
query_;
286 std::unique_ptr<boost::asio::ip::tcp::socket>
socket_;
std::string & buBaseRunDir()
FILE * bu_t_monitor_stream
std::string getStreamDestinations(std::string const &stream) const
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
jsoncollector::DataPointDefinition * dpd_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
pthread_mutex_t init_lock_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
void createProcessingNotificationMaybe() const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool isSingleStreamThread()
std::string eolsFileName(const unsigned int ls) const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getEoRFilePath() const
bool fileBrokerHostFromCfg_
unsigned long previousFileSize_
bool useFileBroker() const
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void createRunOpendirMaybe()
void overrideRunNumber(unsigned int run)
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::mutex * fileDeleteLockPtr_
std::string mergeTypePset_
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry ®)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
bool outputAdler32Recheck_
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool outputAdler32Recheck() const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string stopFilePath_
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
void tryInitializeFuLockFile()
std::string findCurrentRunDir()
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
int readLastLSEntry(std::string const &file)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void openFULockfileStream(bool create)
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
void preBeginRun(edm::GlobalContext const &globalContext)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
evf::FastMonitoringService * fms_
std::string eorFileName() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string findRunDir(unsigned int)
bool fileBrokerUseLocalLock_
void postEndRun(edm::GlobalContext const &globalContext)
FILE * bu_w_monitor_stream
static const std::vector< std::string > MergeTypeNames_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
unsigned int getStartLumisectionFromEnv() const
unsigned int getRunNumber() const
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
bool fileBrokerKeepAlive_
std::string getRunOpenDirPath() const
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
std::string getFFFParamsFilePathOnBU() const
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
std::string getEoRFilePathOnFU() const
std::string bu_run_open_dir_
std::string initFileName(std::string const &stream) const