1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR 2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR 29 #include <oneapi/tbb/concurrent_hash_map.h> 30 #include <boost/asio.hpp> 40 class PathsAndConsumesOfModulesBase;
49 class DataPointDefinition;
115 uint16_t& rawHeaderSize,
117 bool& setExceptionState);
129 void createBoLSFile(
const uint32_t lumiSection,
bool checkIfExists)
const;
131 const uint32_t currentLumiSection,
136 uint16_t& rawHeaderSize,
137 uint32_t& lsFromHeader,
138 int32_t& eventsFromHeader,
139 int64_t& fileSizeFromHeader,
146 uint16_t& rawHeaderSize,
147 int64_t& fileSizeFromHeader,
153 int64_t& fileSizeFromJson,
160 uint32_t& closedServerLS,
170 uint16_t& rawHeaderSize,
171 int32_t& serverEventsInNewFile_,
180 std::list<std::pair<
int, std::unique_ptr<InputFile>>>* filesToDelete) {
188 static struct flock
make_flock(short
type, short whence, off_t
start, off_t len, pid_t pid);
197 uint16_t& rawHeaderSize,
199 bool& setExceptionState);
287 std::unique_ptr<boost::asio::ip::tcp::resolver>
resolver_;
288 std::unique_ptr<boost::asio::ip::tcp::resolver::query>
query_;
290 std::unique_ptr<boost::asio::ip::tcp::socket>
socket_;
std::string & buBaseRunDir()
FILE * bu_t_monitor_stream
unsigned int nConcurrentLumis_
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
unsigned int getRunNumber() const
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
jsoncollector::DataPointDefinition * dpd_
bool useFileBroker() const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
bool outputAdler32Recheck() const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
pthread_mutex_t init_lock_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
bool lumisectionDiscarded(unsigned int ls)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool isSingleStreamThread()
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
unsigned int numConcurrentLumis() const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool fileBrokerHostFromCfg_
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createRunOpendirMaybe()
std::string getInitTempFilePath(std::string const &stream) const
void overrideRunNumber(unsigned int run)
std::string getEoRFilePathOnFU() const
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
std::string getRunOpenDirPath() const
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry ®)
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
bool outputAdler32Recheck_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string eorFileName() const
void tryInitializeFuLockFile()
std::string findCurrentRunDir()
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
JSON (JavaScript Object Notation).
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
void preBeginRun(edm::GlobalContext const &globalContext)
unsigned int getStartLumisectionFromEnv() const
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned long long uint64_t
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
evf::FastMonitoringService * fms_
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findRunDir(unsigned int)
std::string initFileName(std::string const &stream) const
bool fileBrokerUseLocalLock_
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
FILE * bu_w_monitor_stream
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
bool fileBrokerKeepAlive_
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
std::string bu_run_open_dir_
std::string eolsFileName(const unsigned int ls) const