1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR 2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR 29 #include <boost/asio.hpp> 39 class PathsAndConsumesOfModulesBase;
48 class DataPointDefinition;
114 uint16_t& rawHeaderSize,
116 bool& setExceptionState);
128 void createBoLSFile(
const uint32_t lumiSection,
bool checkIfExists)
const;
130 const uint32_t currentLumiSection,
135 uint16_t& rawHeaderSize,
136 uint16_t& rawDataType,
137 uint32_t& lsFromHeader,
138 int32_t& eventsFromHeader,
139 int64_t& fileSizeFromHeader,
146 uint16_t& rawHeaderSize,
147 int64_t& fileSizeFromHeader,
151 bool requireHeader =
true);
154 int64_t& fileSizeFromJson,
161 uint32_t& closedServerLS,
171 uint16_t& rawHeaderSize,
172 int32_t& serverEventsInNewFile_,
175 bool requireHeader =
true);
182 std::list<std::pair<
int, std::unique_ptr<InputFile>>>* filesToDelete) {
191 static struct flock
make_flock(short
type, short whence, off_t
start, off_t len, pid_t pid);
202 uint16_t& rawHeaderSize,
204 bool& setExceptionState);
289 std::unique_ptr<boost::asio::ip::tcp::resolver>
resolver_;
290 std::unique_ptr<boost::asio::ip::tcp::resolver::query>
query_;
292 std::unique_ptr<boost::asio::ip::tcp::socket>
socket_;
std::string & buBaseRunDir()
FILE * bu_t_monitor_stream
unsigned int nConcurrentLumis_
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string const & runString() const
std::vector< std::string > bu_base_dirs_all_
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
unsigned int getRunNumber() const
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
bool useFileBroker() const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
bool outputAdler32Recheck() const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
pthread_mutex_t init_lock_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
bool isSingleStreamThread()
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
unsigned int numConcurrentLumis() const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool fileBrokerHostFromCfg_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createRunOpendirMaybe()
std::string getInitTempFilePath(std::string const &stream) const
void overrideRunNumber(unsigned int run)
std::string getEoRFilePathOnFU() const
std::string hltSourceDirectory_
unsigned int startFromLS_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string getRunOpenDirPath() const
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry ®)
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
bool outputAdler32Recheck_
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string eorFileName() const
void tryInitializeFuLockFile()
std::string findCurrentRunDir()
std::string getStreamMergeType(std::string const &, MergeType defaultType) const
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string input_throttled_file_
std::string & buBaseRunOpenDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
JSON (JavaScript Object Notation).
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
void preBeginRun(edm::GlobalContext const &globalContext)
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getEoRFileName() const
evf::FastMonitoringService * fms_
std::vector< int > const & getBUBaseDirsNSources() const
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findRunDir(unsigned int)
std::string initFileName(std::string const &stream) const
bool fileBrokerUseLocalLock_
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
FILE * bu_w_monitor_stream
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
std::string fileBrokerPort_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
std::string getStreamDestinations(std::string const &) const
void preallocate(edm::service::SystemBounds const &bounds)
std::vector< int > bu_base_dirs_nSources_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
bool fileBrokerKeepAlive_
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::vector< std::string > const & getBUBaseDirs() const
std::string bu_run_open_dir_
std::string eolsFileName(const unsigned int ls) const