26 #include <boost/algorithm/string.hpp> 37 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {
"",
"DAT",
"PB",
"JSNDATA"};
40 : base_dir_(
pset.getUntrackedParameter<
std::
string>(
"baseDir")),
41 bu_base_dir_(
pset.getUntrackedParameter<
std::
string>(
"buBaseDir")),
43 bu_base_dirs_nSources_(
pset.getUntrackedParameter<
std::
vector<
int>>(
"buBaseDirsNumStreams")),
44 run_(
pset.getUntrackedParameter<unsigned
int>(
"runNumber")),
45 useFileBroker_(
pset.getUntrackedParameter<
bool>(
"useFileBroker")),
46 fileBrokerHostFromCfg_(
pset.getUntrackedParameter<
bool>(
"fileBrokerHostFromCfg",
true)),
47 fileBrokerHost_(
pset.getUntrackedParameter<
std::
string>(
"fileBrokerHost",
"InValid")),
48 fileBrokerPort_(
pset.getUntrackedParameter<
std::
string>(
"fileBrokerPort",
"8080")),
49 fileBrokerKeepAlive_(
pset.getUntrackedParameter<
bool>(
"fileBrokerKeepAlive",
true)),
50 fileBrokerUseLocalLock_(
pset.getUntrackedParameter<
bool>(
"fileBrokerUseLocalLock",
true)),
51 fuLockPollInterval_(
pset.getUntrackedParameter<unsigned
int>(
"fuLockPollInterval", 2000)),
52 outputAdler32Recheck_(
pset.getUntrackedParameter<
bool>(
"outputAdler32Recheck",
false)),
53 requireTSPSet_(
pset.getUntrackedParameter<
bool>(
"requireTransfersPSet",
false)),
54 selectedTransferMode_(
pset.getUntrackedParameter<
std::
string>(
"selectedTransferMode",
"")),
55 mergeTypePset_(
pset.getUntrackedParameter<
std::
string>(
"mergingPset",
"")),
56 directorBU_(
pset.getUntrackedParameter<
bool>(
"directorIsBU",
false)),
57 hltSourceDirectory_(
pset.getUntrackedParameter<
std::
string>(
"hltSourceDirectory",
"")),
61 fu_readwritelock_fd_(-1),
62 fulocal_rwlock_fd_(-1),
63 fulocal_rwlock_fd2_(-1),
64 bu_w_lock_stream(nullptr),
65 bu_r_lock_stream(nullptr),
66 fu_rw_lock_stream(nullptr),
67 dirManager_(base_dir_),
69 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
70 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
71 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
73 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
74 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
83 gethostname(hostname, 32);
86 char* fuLockPollIntervalPtr = std::getenv(
"FFF_LOCKPOLLINTERVAL");
87 if (fuLockPollIntervalPtr) {
98 char* fileBrokerParamPtr = std::getenv(
"FFF_USEFILEBROKER");
99 if (fileBrokerParamPtr) {
112 if (
stat(
"/etc/appliance/bus.config", &
buf) == 0) {
117 throw cms::Exception(
"EvFDaqDirector") <<
"No file service or BU data address information";
120 <<
"fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
128 char* startFromLSPtr = std::getenv(
"FFF_START_LUMISECTION");
129 if (startFromLSPtr) {
139 char* fileBrokerUseLockParamPtr = std::getenv(
"FFF_FILEBROKERUSELOCALLOCK");
140 if (fileBrokerUseLockParamPtr) {
143 edm::LogInfo(
"EvFDaqDirector") <<
"Setting fileBrokerUseLocalLock parameter by environment string: " 158 <<
" Error while setting number of sources: size mismatch with BU base directory vector";
167 std::stringstream
ss;
168 ss <<
"run" << std::setfill(
'0') << std::setw(6) <<
run_;
170 ss = std::stringstream();
176 ss = std::stringstream();
183 int retval =
mkdir(
base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
184 if (retval != 0 && errno != EEXIST) {
186 <<
" Error checking for base dir -: " <<
base_dir_ <<
" mkdir error:" << strerror(errno);
191 retval =
mkdir(
run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
192 if (retval != 0 && errno != EEXIST) {
194 <<
" Error creating run dir -: " <<
run_dir_ <<
" mkdir error:" << strerror(errno);
202 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
205 <<
" Error creating/opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
206 chmod(fulocal_lock_.c_str(), 0777);
210 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
213 <<
" Error opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
225 if (retval != 0 && errno != EEXIST) {
227 <<
" Error creating bu run dir -: " <<
bu_run_dir_ <<
" mkdir error:" << strerror(errno);
231 if (retval != 0 && errno != EEXIST) {
233 <<
" Error creating bu run open dir -: " <<
bu_run_open_dir_ <<
" mkdir error:" << strerror(errno);
237 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
239 edm::LogWarning(
"EvFDaqDirector") <<
"problem with creating filedesc for buwritelock -: " << strerror(errno);
244 edm::LogWarning(
"EvFDaqDirector") <<
"Error creating write lock stream -: " << strerror(errno);
258 retval =
mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
259 if (retval != 0 && errno != EEXIST)
261 <<
" Error creating bu run dir -: " << hltdir <<
" mkdir error:" << strerror(errno);
263 std::filesystem::copy_file(
hltSourceDirectory_ +
"/HltConfig.py", tmphltdir +
"/HltConfig.py");
264 std::filesystem::copy_file(
hltSourceDirectory_ +
"/fffParameters.jsn", tmphltdir +
"/fffParameters.jsn");
266 std::string optfiles[3] = {
"hltinfo",
"blacklist",
"whitelist"};
267 for (
auto& optfile : optfiles) {
269 std::filesystem::copy_file(
hltSourceDirectory_ +
"/" + optfile, tmphltdir +
"/" + optfile);
274 std::filesystem::rename(tmphltdir, hltdir);
282 auto checkExists = [=](
std::string const& bu_base_dir) ->
void {
283 int retval =
mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
284 if (retval != 0 && errno != EEXIST) {
286 <<
" Error checking for bu base dir -: " << bu_base_dir <<
" mkdir error:" << strerror(errno);
290 auto waitForDir = [=](
std::string const& bu_base_dir) ->
void {
293 int retval =
mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
294 if (retval != 0 && errno != EEXIST) {
300 throw cms::Exception(
"DaqDirector") <<
" Error checking for bu base dir after 1 minute -: " << bu_base_dir
301 <<
" mkdir error:" << strerror(errno);
325 std::stringstream sstp;
332 if (!
stat(defPath.c_str(), &statbuf))
333 edm::LogInfo(
"EvFDaqDirector") <<
"found JSD file in ramdisk -: " << defPath;
336 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
337 defPath =
std::string(std::getenv(
"CMSSW_BASE")) +
"/" + defPathSuffix;
338 if (
stat(defPath.c_str(), &statbuf)) {
339 defPath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + defPathSuffix;
340 if (
stat(defPath.c_str(), &statbuf)) {
341 defPath = defPathSuffix;
347 DataPointDefinition::getDataPointDefinitionFor(defPath,
dpd_, &defLabel);
354 boost::system::error_code ec;
355 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
381 "Service used for file locking arbitration and for propagating information between other EvF components");
382 desc.addUntracked<
std::string>(
"baseDir",
".")->setComment(
"Local base directory for run output");
383 desc.addUntracked<
std::string>(
"buBaseDir",
".")->setComment(
"BU base ramdisk directory ");
384 desc.addUntracked<std::vector<std::string>>(
"buBaseDirsAll", std::vector<std::string>())
385 ->setComment(
"BU base ramdisk directories for multi-file DAQSource models");
386 desc.addUntracked<std::vector<int>>(
"buBaseDirsNumStreams", std::vector<int>())
387 ->setComment(
"Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
388 desc.addUntracked<
unsigned int>(
"runNumber", 0)->setComment(
"Run Number in ramdisk to open");
389 desc.addUntracked<
bool>(
"useFileBroker",
false)
390 ->setComment(
"Use BU file service to grab input data instead of NFS file locking");
391 desc.addUntracked<
bool>(
"fileBrokerHostFromCfg",
true)
392 ->setComment(
"Allow service to discover BU address from hltd configuration");
393 desc.addUntracked<
std::string>(
"fileBrokerHost",
"InValid")->setComment(
"BU file service host.");
394 desc.addUntracked<
std::string>(
"fileBrokerPort",
"8080")->setComment(
"BU file service port");
395 desc.addUntracked<
bool>(
"fileBrokerKeepAlive",
true)
396 ->setComment(
"Use keep alive to avoid using large number of sockets");
397 desc.addUntracked<
bool>(
"fileBrokerUseLocalLock",
true)
398 ->setComment(
"Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
399 desc.addUntracked<
unsigned int>(
"fuLockPollInterval", 2000)
400 ->setComment(
"Lock polling interval in microseconds for the input directory file lock");
401 desc.addUntracked<
bool>(
"outputAdler32Recheck",
false)
402 ->setComment(
"Check Adler32 of per-process output files while micro-merging");
403 desc.addUntracked<
bool>(
"requireTransfersPSet",
false)
404 ->setComment(
"Require complete transferSystem PSet in the process configuration");
406 ->setComment(
"Selected transfer mode (choice in Lvl0 propagated as Python parameter");
407 desc.addUntracked<
bool>(
"directorIsBU",
false)->setComment(
"BU director mode used for testing");
408 desc.addUntracked<
std::string>(
"hltSourceDirectory",
"")->setComment(
"BU director mode source directory");
410 ->setComment(
"Name of merging PSet to look for merging type definitions for streams");
411 descriptions.
add(
"EvFDaqDirector",
desc);
442 edm::LogWarning(
"EvFDaqDirector") <<
" Handles to check for files to delete were not set by the input source...";
556 int retval =
remove(
filename.c_str());
559 <<
". error = " << strerror(errno);
565 uint16_t& rawHeaderSize,
567 bool& setExceptionState) {
572 int lock_attempts = 0;
573 long total_lock_attempts = 0;
579 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
580 if (stopFileCheck == 0)
586 if (stopFileLS >= 0 && (
int)
ls >= stopFileLS)
590 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: " 596 timeval ts_lockbegin;
597 gettimeofday(&ts_lockbegin,
nullptr);
599 while (retval == -1) {
608 if (lock_attempts > 5000000 || errno == 116) {
611 <<
"Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
613 edm::LogWarning(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 seconds. Checking if run directory and " 614 "fu.lock file are present -: errno " 615 << errno <<
":" << strerror(errno) << std::endl;
618 edm::LogWarning(
"EvFDaqDirector") <<
"Detected local EoLS for lumisection " <<
ls;
630 if (total_lock_attempts > 5 * 60000000) {
631 edm::LogError(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 minutes. Stopping polling activity.";
637 gettimeofday(&ts_lockend,
nullptr);
638 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
640 lockWaitTime = deltat;
647 gettimeofday(&ts_lockend, 0);
651 int fu_readwritelock_fd2 = open(
fulockfile_.c_str(), O_RDWR, S_IRWXU);
652 if (fu_readwritelock_fd2 == -1)
654 <<
" create. error:" << strerror(errno);
656 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2,
"r+");
659 if (fu_rw_lock_stream2 !=
nullptr) {
663 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
667 fscanf(fu_rw_lock_stream2,
"%u %u", &readLs, &
readIndex);
670 unsigned int currentLs = readLs;
671 bool bumpedOk =
false;
674 if (
ls &&
ls + 1 < currentLs)
678 bumpedOk =
bumpFile(readLs,
readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
680 if (
ls && readLs > currentLs && currentLs >
ls) {
682 readLs = currentLs =
ls;
687 if (
ls == 0 && readLs > currentLs) {
702 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
704 ftruncate(fu_readwritelock_fd2, 0);
706 fprintf(fu_rw_lock_stream2,
"%u %u", readLs,
readIndex + 1);
707 fflush(fu_rw_lock_stream2);
708 fsync(fu_readwritelock_fd2);
710 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" <<
readIndex + 1;
713 <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
714 setExceptionState =
true;
717 }
else if (currentLs < readLs) {
719 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
721 ftruncate(fu_readwritelock_fd2, 0);
723 fprintf(fu_rw_lock_stream2,
"%u %u", readLs,
readIndex);
724 fflush(fu_rw_lock_stream2);
725 fsync(fu_readwritelock_fd2);
726 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" <<
readIndex;
729 <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
730 setExceptionState =
true;
735 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for reading failed with error " 739 edm::LogError(
"EvFDaqDirector") <<
"fu read/write lock stream is invalid " << strerror(errno);
741 fclose(fu_rw_lock_stream2);
744 timeval ts_preunlock;
745 gettimeofday(&ts_preunlock, 0);
746 int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
747 double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
758 edm::LogError(
"EvFDaqDirector") <<
"Error unlocking the fu.lock " << strerror(errno);
761 edm::LogDebug(
"EvFDaqDirector") <<
"Waited during lock -: " << locked_period <<
" seconds";
764 if (fileStatus ==
noFile) {
769 if (stopFileLS >= 0 && (
int)
ls > stopFileLS) {
770 edm::LogInfo(
"EvFDaqDirector") <<
"Reached maximum lumisection set by hltd";
778 std::ifstream ij(BUEoLSFile);
782 if (!
reader.parse(ij, deserializeRoot)) {
783 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" << BUEoLSFile;
789 dp.deserialize(deserializeRoot);
797 while (!
def.empty()) {
799 if (
def.find(
'/') == 0)
807 DataPointDefinition::getDataPointDefinitionFor(
fullpath, &eolsDpd, &defLabel);
812 DataPointDefinition::getDataPointDefinitionFor(
fullpath, &eolsDpd, &defLabel);
814 for (
unsigned int i = 0;
i < eolsDpd.
getNames().size();
i++)
815 if (eolsDpd.
getNames().at(
i) ==
"NFiles")
821 if (
def.size() <= 1 ||
def.find(
'/') == std::string::npos) {
832 edm::LogError(
"EvFDaqDirector") <<
" error reading number of files from BU JSON -: " << BUEoLSFile;
835 return std::stoi(
data);
842 uint16_t& rawHeaderSize,
844 bool& setExceptionState) {
856 if (maxLS >= 0 &&
ls > (
unsigned int)maxLS)
860 std::stringstream
ss;
864 if (
stat(nextFileJson.c_str(), &
buf) == 0) {
866 nextFile = nextFileJson;
875 nextFile = nextFileRaw;
881 if (
stat(BUEoLSFile.c_str(), &
buf) == 0) {
883 if (
stat(nextFileJson.c_str(), &
buf) == 0) {
885 nextFile = nextFileJson;
890 nextFile = nextFileRaw;
895 if (indexFilesInLS < 0)
900 if ((
int)
index < indexFilesInLS) {
903 <<
"Potential miss of index file in LS -: " <<
ls <<
". Missing " << nextFile <<
" because " 904 << indexFilesInLS - 1 <<
" is the highest index expected. Will not update fu.lock file";
905 setExceptionState =
true;
914 if (maxLS >= 0 &&
ls > (
unsigned int)maxLS)
919 if (
stat(nextFileJson.c_str(), &
buf) == 0) {
922 nextFile = nextFileJson;
927 nextFile = nextFileRaw;
939 edm::LogError(
"EvFDaqDirector") <<
"Error creating fu read/write lock stream " << strerror(errno);
941 edm::LogInfo(
"EvFDaqDirector") <<
"Initializing FU LOCK FILE";
950 open(
fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
957 <<
" create:" <<
create <<
" error:" << strerror(errno);
963 edm::LogError(
"EvFDaqDirector") <<
"problem with opening fuwritelock file stream -: " << strerror(errno);
994 if (checkIfExists ==
false ||
stat(fuBoLS.c_str(), &
buf) != 0) {
995 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1001 const uint32_t currentLumiSection,
1003 bool doCreateEoLS) {
1004 if (currentLumiSection > 0) {
1011 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1017 }
else if (doCreateBoLS) {
1024 uint16_t& rawHeaderSize,
1025 uint16_t& rawDataType,
1026 uint32_t& lsFromHeader,
1027 int32_t& eventsFromHeader,
1028 int64_t& fileSizeFromHeader,
1034 if ((
infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1037 <<
"parseFRDFileHeader - failed to open input file -: " << rawSourcePath <<
" : " << strerror(errno);
1049 if ((
infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1051 <<
"parseFRDFileHeader - failed to open input file -: " << rawSourcePath <<
" : " << strerror(errno);
1052 if (errno == ENOENT)
1068 if (frd_version == 0) {
1070 if (requireHeader) {
1071 edm::LogError(
"EvFDaqDirector") <<
"no header or invalid version string found in:" << rawSourcePath;
1076 lseek(
infile, 0, SEEK_SET);
1079 eventsFromHeader = -1;
1080 fileSizeFromHeader = -1;
1082 }
else if (frd_version == 1) {
1089 edm::LogError(
"EvFDaqDirector") <<
"inconsistent header size: " << rawSourcePath <<
" size: " << headerSizeRaw
1090 <<
" v:" << frd_version;
1097 eventsFromHeader = (int32_t)fhContent->
eventCount_;
1098 fileSizeFromHeader = (int64_t)fhContent->
fileSize_;
1101 }
else if (frd_version == 2) {
1108 edm::LogError(
"EvFDaqDirector") <<
"inconsistent header size: " << rawSourcePath <<
" size: " << headerSizeRaw
1109 <<
" v:" << frd_version;
1116 eventsFromHeader = (int32_t)fhContent->
eventCount_;
1117 fileSizeFromHeader = (int64_t)fhContent->
fileSize_;
1133 edm::LogError(
"EvFDaqDirector") <<
"rawFileHasHeader - unable to read " <<
path <<
" : " << strerror(errno);
1138 if ((
size_t)sz_read < buf_sz) {
1139 edm::LogError(
"EvFDaqDirector") <<
"rawFileHasHeader - file smaller than header: " <<
path;
1149 if ((
infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1150 edm::LogWarning(
"EvFDaqDirector") <<
"rawFileHasHeader - failed to open input file -: " << rawSourcePath <<
" : " 1161 if (frd_version == 1) {
1168 }
else if (frd_version == 2) {
1176 edm::LogError(
"EvFDaqDirector") <<
"rawFileHasHeader - unknown version: " << frd_version;
1185 uint16_t& rawHeaderSize,
1186 int64_t& fileSizeFromHeader,
1190 bool requireHeader) {
1195 size_t pos = 0, n_tokens = 0;
1196 while (n_tokens++ < 3 && (
pos = jsonStem.find(
'_',
pos + 1)) != std::string::npos) {
1200 std::ostringstream fileNameWithPID;
1202 fileNameWithPID << reducedJsonStem <<
"_pid" << std::setfill(
'0') << std::setw(5) <<
pid_ <<
".jsn";
1206 LogDebug(
"EvFDaqDirector") <<
"RAW parse -: " << rawSourcePath <<
" and JSON create " << jsonDestPath;
1210 int32_t nbEventsWrittenRaw;
1211 int64_t fileSizeFromRaw;
1212 uint16_t rawDataType;
1230 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1231 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1232 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233 if (errno == EEXIST) {
1234 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1238 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath <<
" : " 1240 struct stat out_stat;
1241 if (
stat(jsonDestPath.c_str(), &out_stat) == 0) {
1243 <<
"grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: " 1245 if (unlink(jsonDestPath.c_str()) == -1) {
1247 <<
"grabNextJsonFromRaw - failed to remove -: " << jsonDestPath <<
" : " << strerror(errno);
1250 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1251 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to open output file (on retry) -: " 1252 << jsonDestPath <<
" : " << strerror(errno);
1257 std::stringstream
ss;
1258 ss <<
"{\"data\":[" << nbEventsWrittenRaw <<
"," << fileSizeFromRaw <<
",\"" << rawSourcePath <<
"\"]}";
1262 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1263 <<
" : " << strerror(errno);
1267 if (serverLS && serverLS != lsFromRaw)
1268 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1269 <<
" and raw file header LS " << lsFromRaw;
1271 fileSizeFromHeader = fileSizeFromRaw;
1272 return nbEventsWrittenRaw;
1277 int64_t& fileSizeFromJson,
1282 std::ostringstream fileNameWithPID;
1283 fileNameWithPID <<
std::filesystem::path(rawSourcePath).stem().string() <<
"_pid" << std::setfill(
'0')
1284 << std::setw(5) <<
pid_ <<
".jsn";
1289 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
1293 if ((
infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1294 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file -: " << jsonSourcePath <<
" : " 1296 if ((
infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1297 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file (on retry) -: " 1298 << jsonSourcePath <<
" : " << strerror(errno);
1299 if (errno == ENOENT)
1305 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1306 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1307 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1308 if (errno == EEXIST) {
1309 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - destination file already exists -: " << jsonDestPath
1314 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file -: " << jsonDestPath <<
" : " 1316 struct stat out_stat;
1317 if (
stat(jsonDestPath.c_str(), &out_stat) == 0) {
1319 <<
"grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1320 if (unlink(jsonDestPath.c_str()) == -1) {
1322 <<
"grabNextJsonFile - failed to remove -: " << jsonDestPath <<
" : " << strerror(errno);
1325 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1326 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file (on retry) -: " 1327 << jsonDestPath <<
" : " << strerror(errno);
1333 const std::size_t buf_sz = 512;
1334 std::size_t tot_written = 0;
1335 std::unique_ptr<char[]>
buf(
new char[buf_sz]);
1337 ssize_t sz, sz_read = 1, sz_write;
1338 while (sz_read > 0 && (sz_read = ::
read(
infile,
buf.get(), buf_sz)) > 0) {
1341 assert(sz_read - sz_write > 0);
1342 if ((sz = ::
write(
outfile,
buf.get() + sz_write, sz_read - sz_write)) < 0) {
1349 }
while (sz_write < sz_read);
1354 if (tot_written > 0) {
1356 if (unlink(jsonSourcePath.c_str()) == -1) {
1357 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to remove -: " << jsonSourcePath <<
" : " 1362 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to copy json file or file was empty -: " 1371 std::stringstream
ss;
1374 if (tot_written <= buf_sz) {
1379 std::ifstream ij(jsonDestPath);
1381 }
catch (std::filesystem::filesystem_error
const& ex) {
1382 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1388 if (tot_written <= buf_sz)
1390 edm::LogError(
"EvFDaqDirector") <<
"Failed to deserialize JSON file -: " << jsonDestPath <<
"\nERROR:\n" 1391 <<
reader.getFormatedErrorMessages() <<
"CONTENT:\n" 1398 dp.deserialize(deserializeRoot);
1402 if (
i <
dp.getData().size()) {
1409 if (!
dp.getData().empty())
1413 <<
"grabNextJsonFile - " 1414 <<
" error reading number of events from BU JSON; No input value. data -: " <<
data;
1420 fileSizeFromJson = -1;
1423 if (
i <
dp.getData().size()) {
1426 fileSizeFromJson = std::stol(dataSize);
1429 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of Bytes from BU JSON. " 1430 <<
"Input value is -: " << dataSize;
1436 return std::stoi(
data);
1437 }
catch (
const std::out_of_range&
e) {
1438 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of events from BU JSON. " 1439 <<
"Input value is -: " <<
data;
1440 }
catch (
const std::invalid_argument&
e) {
1441 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - argument error parsing events from BU JSON. " 1442 <<
"Input value is -: " <<
data;
1443 }
catch (std::runtime_error
const&
e) {
1445 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - std::runtime_error exception -: " <<
e.what();
1449 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " <<
e.what();
1452 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1465 std::ostringstream fileNameWithPID;
1466 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid()
1468 jsonDestPath /= fileNameWithPID.str();
1470 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
1473 }
catch (std::filesystem::filesystem_error
const& ex) {
1475 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1485 }
catch (std::filesystem::filesystem_error
const& ex) {
1487 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1490 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
1493 std::ifstream ij(jsonDestPath);
1497 std::stringstream
ss;
1499 if (!
reader.parse(
ss.str(), deserializeRoot)) {
1500 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1502 <<
reader.getFormatedErrorMessages() <<
"CONTENT:\n" 1504 throw std::runtime_error(
"Cannot deserialize input JSON file");
1510 dp.deserialize(deserializeRoot);
1514 if (
i <
dp.getData().size()) {
1520 if (!
dp.getData().empty())
1524 <<
" error reading number of events from BU JSON -: No input value " <<
data;
1526 return std::stoi(
data);
1527 }
catch (std::filesystem::filesystem_error
const& ex) {
1530 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1531 }
catch (std::runtime_error
const&
e) {
1534 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile runtime Exception -: " <<
e.what();
1535 }
catch (
const std::out_of_range&) {
1536 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile error parsing number of events from BU JSON. " 1537 <<
"Input value is -: " <<
data;
1538 }
catch (
const std::invalid_argument&) {
1539 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile argument error parsing events from BU JSON. " 1540 <<
"Input value is -: " <<
data;
1544 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " <<
e.what();
1553 uint32_t& closedServerLS,
1559 serverError =
false;
1561 boost::system::error_code ec;
1569 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::connect error -:" << ec;
1575 boost::asio::streambuf request;
1576 std::ostream request_stream(&request);
1579 std::stringstream spath;
1580 spath <<
path <<
"&stopls=" << maxLS;
1584 request_stream <<
"GET " <<
path <<
" HTTP/1.1\r\n";
1586 request_stream <<
"Accept: */*\r\n";
1587 request_stream <<
"Connection: keep-alive\r\n\r\n";
1592 edm::LogInfo(
"EvFDaqDirector") <<
"reconnecting socket on received connection_reset";
1596 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::connect error -:" << ec;
1602 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::write error -:" << ec;
1607 boost::asio::streambuf response;
1608 boost::asio::read_until(*
socket_, response,
"\r\n", ec);
1610 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::read_until error -:" << ec;
1615 std::istream response_stream(&response);
1618 response_stream >> http_version;
1620 response_stream >> serverHttpStatus;
1623 std::getline(response_stream, status_message);
1624 if (!response_stream || http_version.substr(0, 5) !=
"HTTP/") {
1629 if (serverHttpStatus != 200) {
1630 edm::LogWarning(
"EvFDaqDirector") <<
"Response returned with status code " << serverHttpStatus;
1637 while (std::getline(response_stream,
header) &&
header !=
"\r") {
1641 std::map<std::string, std::string> serverMap;
1642 while (std::getline(response_stream, fileInfo) && fileInfo !=
"\r") {
1643 auto pos = fileInfo.find(
'=');
1644 if (
pos == std::string::npos)
1646 auto stitle = fileInfo.substr(0,
pos);
1647 auto svalue = fileInfo.substr(
pos + 1);
1648 serverMap[stitle] = svalue;
1652 auto server_version = serverMap.find(
"version");
1653 assert(server_version != serverMap.end());
1655 auto server_run = serverMap.find(
"runnumber");
1656 assert(server_run != serverMap.end());
1659 auto server_state = serverMap.find(
"state");
1660 assert(server_state != serverMap.end());
1662 auto server_eols = serverMap.find(
"lasteols");
1663 assert(server_eols != serverMap.end());
1665 auto server_ls = serverMap.find(
"lumisection");
1667 int version_maj = 1;
1668 int version_min = 0;
1669 int version_rev = 0;
1671 auto* s_ptr = server_version->second.c_str();
1672 if (!server_version->second.empty() && server_version->second[0] ==
'"')
1674 auto res = sscanf(s_ptr,
"%d.%d.%d", &version_maj, &version_min, &version_rev);
1676 res = sscanf(s_ptr,
"%d.%d", &version_maj, &version_min);
1678 res = sscanf(s_ptr,
"%d", &version_maj);
1681 edm::LogWarning(
"EvFDaqDirector") <<
"Can not parse server version " << server_version->second;
1688 if (server_ls != serverMap.end())
1691 serverLS = closedServerLS + 1;
1694 if (s_state ==
"STARTING")
1696 auto server_file = serverMap.find(
"file");
1697 assert(server_file == serverMap.end());
1699 edm::LogInfo(
"EvFDaqDirector") <<
"Got STARTING notification with last EOLS " << closedServerLS;
1700 }
else if (s_state ==
"READY") {
1701 auto server_file = serverMap.find(
"file");
1702 if (server_file == serverMap.end()) {
1704 if (serverLS <= closedServerLS)
1705 serverLS = closedServerLS + 1;
1708 <<
"Got READY notification with last EOLS " << closedServerLS <<
" and no new file";
1712 auto server_fileprefix = serverMap.find(
"fileprefix");
1714 if (server_fileprefix != serverMap.end()) {
1715 auto pssize = server_fileprefix->second.size();
1716 if (pssize > 1 && server_fileprefix->second[0] ==
'"' && server_fileprefix->second[pssize - 1] ==
'"')
1717 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1719 fileprefix = server_fileprefix->second;
1723 auto ssize = server_file->second.size();
1724 if (ssize > 1 && server_file->second[0] ==
'"' && server_file->second[ssize - 1] ==
'"')
1725 filestem = server_file->second.substr(1, ssize - 2);
1727 filestem = server_file->second;
1728 assert(!filestem.empty());
1729 if (version_maj > 1) {
1730 nextFileRaw =
bu_run_dir_ +
"/" + fileprefix + filestem +
".raw";
1731 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1735 nextFileRaw =
bu_run_dir_ +
"/" + filestem +
".raw";
1736 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1737 nextFileJson = filestem +
".jsn";
1741 edm::LogInfo(
"EvFDaqDirector") <<
"Got READY notification with last EOLS " << closedServerLS <<
" new LS " 1742 << serverLS <<
" file:" << filestem;
1744 }
else if (s_state ==
"EOLS") {
1745 serverLS = closedServerLS + 1;
1746 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOLS notification with last EOLS " << closedServerLS;
1748 }
else if (s_state ==
"EOR") {
1750 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOR notification with last EOLS " << closedServerLS;
1752 }
else if (s_state ==
"NORUN") {
1753 auto err_msg = serverMap.find(
"errormessage");
1754 if (err_msg != serverMap.end())
1755 edm::LogWarning(
"EvFDaqDirector") <<
"Server NORUN -:" << server_state->second <<
" : " << err_msg->second;
1760 }
else if (s_state ==
"ERROR") {
1761 auto err_msg = serverMap.find(
"errormessage");
1762 if (err_msg != serverMap.end())
1763 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second <<
" : " << err_msg->second;
1765 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second;
1769 edm::LogWarning(
"EvFDaqDirector") <<
"Unknown Server state -:" << server_state->second;
1778 if (ec != boost::asio::error::eof) {
1779 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::read_until error -:" << ec;
1793 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1795 edm::LogWarning(
"EvFDaqDirector") <<
"socket shutdown error -:" << ec;
1820 uint16_t& rawHeaderSize,
1821 int32_t& serverEventsInNewFile,
1822 int64_t& fileSizeFromMetadata,
1824 bool requireHeader) {
1832 int stopFileLS = -1;
1835 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1836 if (stopFileCheck == 0)
1842 if (stopFileLS >= 0 && (
int)
ls >= stopFileLS)
1846 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: " 1860 timeval ts_lockbegin;
1861 gettimeofday(&ts_lockbegin,
nullptr);
1864 uint32_t serverLS, closedServerLS;
1865 unsigned int serverHttpStatus;
1872 int maxLS = stopFileLS < 0 ? -1 :
std::max(stopFileLS, (
int)currentLumiSection);
1873 bool rawHeader =
false;
1875 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1885 if (currentLumiSection == 0) {
1891 if (closedServerLS >= currentLumiSection) {
1893 for (uint32_t
i =
std::max(currentLumiSection, 1
U);
i <= closedServerLS;
i++)
1898 bool fileFound =
true;
1903 nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS,
false, requireHeader);
1905 serverEventsInNewFile =
grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1908 if (serverEventsInNewFile < 0 && rawFd != -1) {
1929 if (currentLumiSection == 0) {
1939 if (closedServerLS >= currentLumiSection) {
1942 for (uint32_t
i =
std::max(currentLumiSection, 1
U);
i <= closedServerLS;
i++)
1950 else if (fileStatus ==
newFile) {
1953 }
else if (fileStatus ==
noFile) {
1957 edm::LogWarning(
"EvFDaqDirector") <<
"Server reported LS " << serverLS
1958 <<
" which is smaller than currently open LS " <<
ls <<
". Ignoring response";
1970 if (!std::filesystem::is_directory(openPath)) {
1971 LogDebug(
"EvFDaqDirector") <<
"<open> FU dir not found. Creating... -:" << openPath.string();
1972 std::filesystem::create_directories(openPath);
1977 std::ifstream ij(
file);
1981 if (!
reader.parse(ij, deserializeRoot)) {
1982 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" <<
file;
1986 int ret = deserializeRoot.
get(
"lastLS",
"").
asInt();
1994 unsigned int lscount = 1;
1996 std::stringstream
ss;
1997 ss << fileprefix << std::setfill(
'0') << std::setw(4) << lscount <<
"_EoLS.jsn";
2015 std::vector<std::string>
destinations = tsPset.getParameter<std::vector<std::string>>(
"destinations");
2017 destinationsVal.append(
dest);
2018 (*transferSystemJson_)[
"destinations"] = destinationsVal;
2021 std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>(
"transferModes");
2022 for (
auto&
mode : modes)
2024 (*transferSystemJson_)[
"transferModes"] = modesVal;
2026 for (
auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
2027 if (psKeyItr->first !=
"destinations" && psKeyItr->first !=
"transferModes") {
2030 for (
auto&
mode : modes) {
2032 if (!streamDef.
existsAs<std::vector<std::string>>(
mode,
true))
2034 <<
" Missing transfer system specification for -:" << psKeyItr->first <<
" (transferMode " <<
mode 2036 std::vector<std::string> streamDestinations = streamDef.
getParameter<std::vector<std::string>>(
mode);
2040 if (streamDestinations.empty())
2042 <<
" Missing transter system destination(s) for -: " << psKeyItr->first <<
", mode:" <<
mode;
2044 for (
auto& sdest : streamDestinations) {
2045 bool sDestValid =
false;
2046 sDestsValue.append(sdest);
2053 <<
" Invalid transter system destination specified for -: " << psKeyItr->first <<
", mode:" <<
mode 2054 <<
", dest:" << sdest;
2056 streamVal[
mode] = sDestsValue;
2058 (*transferSystemJson_)[psKeyItr->first] = streamVal;
2063 throw cms::Exception(
"EvFDaqDirector") <<
"transferSystem PSet not found";
2070 streamRequestName =
stream;
2072 std::stringstream
msg;
2073 msg <<
"Transfer system mode definitions missing for -: " <<
stream;
2084 <<
"Selected mode string is not provided as DaqDirector parameter." 2085 <<
"Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
2089 throw cms::Exception(
"EvFDaqDirector") <<
"Selected mode string is not provided as DaqDirector parameter.";
2093 std::stringstream
msg;
2108 ret += (*it).asString();
2123 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2125 ac->second = streamType;
2132 tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2134 return search_ac->second;
2136 edm::LogInfo(
"EvFDaqDirector") <<
" No merging type specified for stream " <<
stream <<
". Using default value";
2138 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2140 ac->second = defaultName;
2147 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2148 close(proc_flag_fd);
2151 struct flock
EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
int def(FILE *, FILE *, int)
unsigned int nConcurrentLumis_
const_iterator end() const
std::vector< std::string > bu_base_dirs_all_
LuminosityBlockNumber_t luminosityBlock() const
T getParameter(std::string const &) const
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
const_iterator begin() const
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
ret
prodAgent to be discontinued
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
ParameterSet const & getParameterSet(std::string const &) const
Value get(UInt index, const Value &defaultValue) const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Value & append(const Value &value)
Append value to array at the end.
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
bool existsAs(std::string const ¶meterName, bool trackiness=true) const
checks if a parameter exists as a given type
Log< level::Error, false > LogError
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool fileBrokerHostFromCfg_
bool isExceptionOnData(unsigned int ls)
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
unsigned long previousFileSize_
static std::string to_string(const XMLCh *ch)
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createRunOpendirMaybe()
std::string getInitTempFilePath(std::string const &stream) const
std::string getEoRFilePathOnFU() const
std::string hltSourceDirectory_
unsigned int startFromLS_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getRunOpenDirPath() const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void tryInitializeFuLockFile()
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string input_throttled_file_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
LuminosityBlockID const & luminosityBlockID() const
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::vector< std::string > const & getNames() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
ParameterSet const & getParameterSet(ParameterSetID const &id)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findHighestRunDir()
char data[epos_bytes_allocation]
bool fileBrokerUseLocalLock_
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
Unserialize a JSON document into a Value.
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
Iterator for object and array value.
std::vector< int > bu_base_dirs_nSources_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
bool fileBrokerKeepAlive_
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
ParameterSetID const & parameterSetID() const
std::string bu_run_open_dir_
array value (ordered list)