|
|
Go to the documentation of this file. 1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
29 #include <tbb/concurrent_hash_map.h>
30 #include <boost/asio.hpp>
40 class PathsAndConsumesOfModulesBase;
49 class DataPointDefinition;
114 uint16_t& rawHeaderSize,
116 bool& setExceptionState);
127 void createBoLSFile(
const uint32_t lumiSection,
bool checkIfExists)
const;
129 const uint32_t currentLumiSection,
130 bool doCreateBoLS =
true);
133 uint16_t& rawHeaderSize,
134 uint32_t& lsFromHeader,
135 int32_t& eventsFromHeader,
136 int64_t& fileSizeFromHeader,
143 uint16_t& rawHeaderSize,
144 int64_t& fileSizeFromHeader,
150 int64_t& fileSizeFromJson,
157 uint32_t& closedServerLS,
167 uint16_t& rawHeaderSize,
168 int32_t& serverEventsInNewFile_,
176 std::list<std::pair<
int, std::unique_ptr<InputFile>>>* filesToDelete) {
184 static struct flock
make_flock(short
type, short whence, off_t
start, off_t len, pid_t pid);
191 uint16_t& rawHeaderSize,
193 bool& setExceptionState);
280 std::unique_ptr<boost::asio::ip::tcp::resolver>
resolver_;
281 std::unique_ptr<boost::asio::ip::tcp::resolver::query>
query_;
283 std::unique_ptr<boost::asio::ip::tcp::socket>
socket_;
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void preallocate(edm::service::SystemBounds const &bounds)
unsigned long previousFileSize_
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void overrideRunNumber(unsigned int run)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
bool fileBrokerKeepAlive_
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
pthread_mutex_t init_lock_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string mergeTypePset_
std::string bu_run_open_dir_
std::string eolsFileName(const unsigned int ls) const
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry ®)
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
unsigned int getLumisectionToStart() const
void checkTransferSystemPSet(edm::ProcessContext const &pc)
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
std::string getStreamDestinations(std::string const &stream) const
std::string eorFileName() const
void tryInitializeFuLockFile()
boost::asio::io_service io_service_
unsigned int stop_ls_override_
std::string fileBrokerHost_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string & baseRunDir()
std::string getRunOpenDirPath() const
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string & buBaseRunDir()
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getInitFilePath(std::string const &stream) const
void openFULockfileStream(bool create)
FILE * bu_t_monitor_stream
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string stopFilePath_
bool isSingleStreamThread()
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
JSON (JavaScript Object Notation).
std::string selectedTransferMode_
evf::FastMonitoringService * fms_
bool useFileBroker() const
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool outputAdler32Recheck() const
unsigned int eolsNFilesIndex_
bool fileBrokerUseLocalLock_
FILE * bu_w_monitor_stream
static const std::vector< std::string > MergeTypeNames_
std::mutex * fileDeleteLockPtr_
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void preBeginRun(edm::GlobalContext const &globalContext)
std::string fileBrokerPort_
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool outputAdler32Recheck_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string getOpenInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned int fuLockPollInterval_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string findCurrentRunDir()
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
void createRunOpendirMaybe()
std::string getEoRFilePathOnFU() const
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
std::string hltSourceDirectory_
unsigned int startFromLS_
int readLastLSEntry(std::string const &file)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string findRunDir(unsigned int)
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
void createProcessingNotificationMaybe() const
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
unsigned int getRunNumber() const
void setFMS(evf::FastMonitoringService *fms)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
jsoncollector::DataPointDefinition * dpd_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void removeFile(unsigned int ls, unsigned int index)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
unsigned long long uint64_t
std::string & buBaseRunOpenDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string stopFilePathPid_
std::string initFileName(std::string const &stream) const
void postEndRun(edm::GlobalContext const &globalContext)
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
int getNFilesFromEoLS(std::string BUEoLSFile)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
bool fileBrokerHostFromCfg_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
void checkMergeTypePSet(edm::ProcessContext const &pc)