25 #include <boost/lexical_cast.hpp> 26 #include <boost/filesystem/fstream.hpp> 27 #include <boost/algorithm/string.hpp> 38 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {
"",
"DAT",
"PB",
"JSNDATA"};
41 : base_dir_(pset.getUntrackedParameter<
std::
string>(
"baseDir")),
42 bu_base_dir_(pset.getUntrackedParameter<
std::
string>(
"buBaseDir")),
43 run_(pset.getUntrackedParameter<unsigned
int>(
"runNumber")),
44 useFileBroker_(pset.getUntrackedParameter<
bool>(
"useFileBroker")),
45 fileBrokerHostFromCfg_(pset.getUntrackedParameter<
bool>(
"fileBrokerHostFromCfg",
true)),
46 fileBrokerHost_(pset.getUntrackedParameter<
std::
string>(
"fileBrokerHost",
"InValid")),
47 fileBrokerPort_(pset.getUntrackedParameter<
std::
string>(
"fileBrokerPort",
"8080")),
48 fileBrokerKeepAlive_(pset.getUntrackedParameter<
bool>(
"fileBrokerKeepAlive",
true)),
49 fileBrokerUseLocalLock_(pset.getUntrackedParameter<
bool>(
"fileBrokerUseLocalLock",
true)),
50 fuLockPollInterval_(pset.getUntrackedParameter<unsigned
int>(
"fuLockPollInterval", 2000)),
51 outputAdler32Recheck_(pset.getUntrackedParameter<
bool>(
"outputAdler32Recheck",
false)),
52 requireTSPSet_(pset.getUntrackedParameter<
bool>(
"requireTransfersPSet",
false)),
53 selectedTransferMode_(pset.getUntrackedParameter<
std::
string>(
"selectedTransferMode",
"")),
54 mergeTypePset_(pset.getUntrackedParameter<
std::
string>(
"mergingPset",
"")),
55 directorBU_(pset.getUntrackedParameter<
bool>(
"directorIsBU",
false)),
56 hltSourceDirectory_(pset.getUntrackedParameter<
std::
string>(
"hltSourceDirectory",
"")),
60 fu_readwritelock_fd_(-1),
61 fulocal_rwlock_fd_(-1),
62 fulocal_rwlock_fd2_(-1),
66 dirManager_(base_dir_),
68 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
82 gethostname(hostname, 32);
85 char* fuLockPollIntervalPtr = std::getenv(
"FFF_LOCKPOLLINTERVAL");
86 if (fuLockPollIntervalPtr) {
91 }
catch (boost::bad_lexical_cast
const&) {
97 char* fileBrokerParamPtr = std::getenv(
"FFF_USEFILEBROKER");
98 if (fileBrokerParamPtr) {
102 }
catch (boost::bad_lexical_cast
const&) {
111 if (
stat(
"/etc/appliance/bus.config", &buf) == 0) {
116 throw cms::Exception(
"EvFDaqDirector") <<
"No file service or BU data address information";
119 <<
"fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
127 char* startFromLSPtr = std::getenv(
"FFF_STARTFROMLS");
128 if (startFromLSPtr) {
132 }
catch (boost::bad_lexical_cast
const&) {
138 char* fileBrokerUseLockParamPtr = std::getenv(
"FFF_FILEBROKERUSELOCALLOCK");
139 if (fileBrokerUseLockParamPtr) {
142 edm::LogInfo(
"EvFDaqDirector") <<
"Setting fileBrokerUseLocalLock parameter by environment string: " 144 }
catch (boost::bad_lexical_cast
const&) {
151 std::stringstream
ss;
152 ss <<
"run" << std::setfill(
'0') << std::setw(6) <<
run_;
154 ss = std::stringstream();
158 ss = std::stringstream();
163 int retval =
mkdir(
base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164 if (retval != 0 && errno != EEXIST) {
166 <<
" Error checking for base dir -: " <<
base_dir_ <<
" mkdir error:" << strerror(errno);
171 retval =
mkdir(
run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172 if (retval != 0 && errno != EEXIST) {
174 <<
" Error creating run dir -: " <<
run_dir_ <<
" mkdir error:" << strerror(errno);
182 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
185 <<
" Error creating/opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
186 chmod(fulocal_lock_.c_str(), 0777);
190 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
193 <<
" Error opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
205 if (retval != 0 && errno != EEXIST) {
207 <<
" Error creating bu run dir -: " <<
bu_run_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
211 if (retval != 0 && errno != EEXIST) {
213 <<
" Error creating bu run open dir -: " <<
bu_run_open_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
217 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
219 edm::LogWarning(
"EvFDaqDirector") <<
"problem with creating filedesc for buwritelock -: " << strerror(errno);
224 edm::LogWarning(
"EvFDaqDirector") <<
"Error creating write lock stream -: " << strerror(errno);
238 retval =
mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239 if (retval != 0 && errno != EEXIST)
241 <<
" Error creating bu run dir -: " << hltdir <<
" mkdir error:" << strerror(errno) <<
"\n";
243 boost::filesystem::copy_file(
hltSourceDirectory_ +
"/HltConfig.py", tmphltdir +
"/HltConfig.py");
245 boost::filesystem::copy_file(
hltSourceDirectory_ +
"/fffParameters.jsn", tmphltdir +
"/fffParameters.jsn");
247 boost::filesystem::rename(tmphltdir, hltdir);
256 if (retval != 0 && errno != EEXIST) {
258 <<
" Error checking for bu base dir -: " <<
bu_base_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
269 std::stringstream sstp;
276 if (!
stat(defPath.c_str(), &statbuf))
277 edm::LogInfo(
"EvFDaqDirector") <<
"found JSD file in ramdisk -: " << defPath;
280 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
281 defPath =
std::string(std::getenv(
"CMSSW_BASE")) +
"/" + defPathSuffix;
282 if (
stat(defPath.c_str(), &statbuf)) {
283 defPath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + defPathSuffix;
284 if (
stat(defPath.c_str(), &statbuf)) {
285 defPath = defPathSuffix;
291 DataPointDefinition::getDataPointDefinitionFor(defPath,
dpd_, &defLabel);
298 boost::system::error_code
ec;
299 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
324 "Service used for file locking arbitration and for propagating information between other EvF components");
327 desc.
addUntracked<
unsigned int>(
"runNumber", 0)->setComment(
"Run Number in ramdisk to open");
329 ->setComment(
"Use BU file service to grab input data instead of NFS file locking");
331 ->setComment(
"Allow service to discover BU address from hltd configuration");
335 ->setComment(
"Use keep alive to avoid using large number of sockets");
337 ->setComment(
"Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
338 desc.
addUntracked<
unsigned int>(
"fuLockPollInterval", 2000)
339 ->setComment(
"Lock polling interval in microseconds for the input directory file lock");
341 ->setComment(
"Check Adler32 of per-process output files while micro-merging");
343 ->setComment(
"Require complete transferSystem PSet in the process configuration");
345 ->setComment(
"Selected transfer mode (choice in Lvl0 propagated as Python parameter");
346 desc.
addUntracked<
bool>(
"directorIsBU",
false)->setComment(
"BU director mode used for testing");
349 ->setComment(
"Name of merging PSet to look for merging type definitions for streams");
350 descriptions.
add(
"EvFDaqDirector", desc);
381 edm::LogWarning(
"EvFDaqDirector") <<
" Handles to check for files to delete were not set by the input source...";
388 if (it->second->lumi_ == ls) {
489 int retval =
remove(filename.c_str());
491 edm::LogError(
"EvFDaqDirector") <<
"Could not remove used file -: " << filename
492 <<
". error = " << strerror(errno);
504 int lock_attempts = 0;
505 long total_lock_attempts = 0;
511 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
512 if (stopFileCheck == 0)
518 if (stopFileLS >= 0 && (
int)ls >= stopFileLS)
522 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: " 528 timeval ts_lockbegin;
529 gettimeofday(&ts_lockbegin,
nullptr);
531 while (retval == -1) {
540 if (lock_attempts > 5000000 || errno == 116) {
543 <<
"Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
545 edm::LogWarning(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 seconds. Checking if run directory and " 546 "fu.lock file are present -: errno " 547 << errno <<
":" << strerror(errno) << std::endl;
550 edm::LogWarning(
"EvFDaqDirector") <<
"Detected local EoLS for lumisection " <<
ls;
562 if (total_lock_attempts > 5 * 60000000) {
563 edm::LogError(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 minutes. Stopping polling activity.";
569 gettimeofday(&ts_lockend,
nullptr);
570 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
572 lockWaitTime = deltat;
579 gettimeofday(&ts_lockend, 0);
583 int fu_readwritelock_fd2 = open(
fulockfile_.c_str(), O_RDWR, S_IRWXU);
584 if (fu_readwritelock_fd2 == -1)
586 <<
" create. error:" << strerror(errno);
588 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2,
"r+");
591 if (fu_rw_lock_stream2 !=
nullptr) {
595 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
599 fscanf(fu_rw_lock_stream2,
"%u %u", &readLs, &readIndex);
602 unsigned int currentLs = readLs;
603 bool bumpedOk =
false;
606 if (ls && ls + 1 < currentLs)
610 bumpedOk =
bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
612 if (ls && readLs > currentLs && currentLs > ls) {
614 readLs = currentLs =
ls;
619 if (ls == 0 && readLs > currentLs) {
634 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
636 ftruncate(fu_readwritelock_fd2, 0);
638 fprintf(fu_rw_lock_stream2,
"%u %u", readLs, readIndex + 1);
639 fflush(fu_rw_lock_stream2);
640 fsync(fu_readwritelock_fd2);
642 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" << readIndex + 1;
645 <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
647 }
else if (currentLs < readLs) {
649 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
651 ftruncate(fu_readwritelock_fd2, 0);
653 fprintf(fu_rw_lock_stream2,
"%u %u", readLs, readIndex);
654 fflush(fu_rw_lock_stream2);
655 fsync(fu_readwritelock_fd2);
656 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" <<
readIndex;
659 <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
663 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for reading failed with error " 667 edm::LogError(
"EvFDaqDirector") <<
"fu read/write lock stream is invalid " << strerror(errno);
669 fclose(fu_rw_lock_stream2);
672 timeval ts_preunlock;
673 gettimeofday(&ts_preunlock, 0);
674 int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
675 double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
686 edm::LogError(
"EvFDaqDirector") <<
"Error unlocking the fu.lock " << strerror(errno);
689 edm::LogDebug(
"EvFDaqDirector") <<
"Waited during lock -: " << locked_period <<
" seconds";
692 if (fileStatus ==
noFile) {
697 if (stopFileLS >= 0 && (
int)ls > stopFileLS) {
698 edm::LogInfo(
"EvFDaqDirector") <<
"Reached maximum lumisection set by hltd";
706 boost::filesystem::ifstream ij(BUEoLSFile);
710 if (!reader.
parse(ij, deserializeRoot)) {
711 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" << BUEoLSFile;
725 while (!def.empty()) {
727 if (def.find(
'/') == 0)
732 if (
stat(fullpath.c_str(), &buf) == 0) {
735 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
740 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
742 for (
unsigned int i = 0;
i < eolsDpd.
getNames().size();
i++)
743 if (eolsDpd.
getNames().at(
i) ==
"NFiles")
749 if (def.size() <= 1 || def.find(
'/') == std::string::npos) {
753 def = def.substr(def.find(
'/') + 1);
760 edm::LogError(
"EvFDaqDirector") <<
" error reading number of files from BU JSON -: " << BUEoLSFile;
763 return boost::lexical_cast<
int>(
data);
767 unsigned int&
ls,
unsigned int&
index,
std::string& nextFile, uint32_t& fsize,
int maxLS) {
778 if (maxLS >= 0 && ls > (
unsigned int)maxLS)
782 std::stringstream
ss;
783 unsigned int nextIndex =
index;
788 if (
stat(nextFile.c_str(), &buf) == 0) {
796 bool eolFound = (
stat(BUEoLSFile.c_str(), &buf) == 0);
799 if (
stat(nextFile.c_str(), &buf) == 0) {
806 if (indexFilesInLS < 0)
811 if ((
int)index < indexFilesInLS) {
814 <<
"Potential miss of index file in LS -: " << ls <<
". Missing " << nextFile <<
" because " 815 << indexFilesInLS - 1 <<
" is the highest index expected. Will not update fu.lock file";
824 if (maxLS >= 0 && ls > (
unsigned int)maxLS)
828 if (
stat(nextFile.c_str(), &buf) == 0) {
838 eolFound = (
stat(BUEoLSFile.c_str(), &buf) == 0);
847 edm::LogError(
"EvFDaqDirector") <<
"Error creating fu read/write lock stream " << strerror(errno);
849 edm::LogInfo(
"EvFDaqDirector") <<
"Initializing FU LOCK FILE";
858 open(
fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
865 <<
" create:" << create <<
" error:" << strerror(errno);
871 edm::LogError(
"EvFDaqDirector") <<
"problem with opening fuwritelock file stream -: " << strerror(errno);
902 if (checkIfExists ==
false ||
stat(fuBoLS.c_str(), &buf) != 0) {
903 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
909 const uint32_t currentLumiSection,
911 if (currentLumiSection > 0) {
914 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
916 int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
921 }
else if (doCreateBoLS) {
928 uint16_t& rawHeaderSize,
929 uint32_t& lsFromHeader,
930 int32_t& eventsFromHeader,
931 int64_t& fileSizeFromHeader,
937 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
940 <<
"parseFRDFileHeader - failed to open input file -: " << rawSourcePath <<
" : " << strerror(errno);
951 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
953 <<
"parseFRDFileHeader - failed to open input file -: " << rawSourcePath <<
" : " << strerror(errno);
965 ssize_t sz_read =
::read(infile, (
char*)&fileHead, buf_sz);
972 edm::LogError(
"EvFDaqDirector") <<
"parseFRDFileHeader - unable to read " << rawSourcePath <<
" : " 978 if ((
size_t)sz_read < buf_sz) {
979 edm::LogError(
"EvFDaqDirector") <<
"parseFRDFileHeader - file smaller than header: " << rawSourcePath;
987 if (frd_version == 0) {
990 edm::LogError(
"EvFDaqDirector") <<
"no header or invalid version string found in:" << rawSourcePath;
996 lseek(infile, 0, SEEK_SET);
999 eventsFromHeader = -1;
1000 fileSizeFromHeader = -1;
1005 if (headerSizeRaw < buf_sz) {
1006 edm::LogError(
"EvFDaqDirector") <<
"inconsistent header size: " << rawSourcePath <<
" size: " << headerSizeRaw
1007 <<
" v:" << frd_version;
1015 fileSizeFromHeader = (int64_t)fileHead.
fileSize_;
1024 uint16_t& rawHeaderSize,
1025 int64_t& fileSizeFromHeader,
1027 uint32_t serverLS) {
1032 size_t pos = 0, n_tokens = 0;
1033 while (n_tokens++ < 3 && (pos = jsonStem.find(
"_", pos + 1)) != std::string::npos) {
1035 std::string reducedJsonStem = jsonStem.substr(0, pos);
1037 std::ostringstream fileNameWithPID;
1039 fileNameWithPID << reducedJsonStem <<
"_pid" << std::setfill(
'0') << std::setw(5) <<
pid_ <<
".jsn";
1043 LogDebug(
"EvFDaqDirector") <<
"RAW parse -: " << rawSourcePath <<
" and JSON create " << jsonDestPath;
1047 int32_t nbEventsWrittenRaw;
1048 int64_t fileSizeFromRaw;
1050 rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw,
true,
true,
false);
1058 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1059 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1060 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1061 if (errno == EEXIST) {
1062 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1066 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath <<
" : " 1068 struct stat out_stat;
1069 if (
stat(jsonDestPath.c_str(), &out_stat) == 0) {
1071 <<
"grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: " 1073 if (unlink(jsonDestPath.c_str()) == -1) {
1075 <<
"grabNextJsonFromRaw - failed to remove -: " << jsonDestPath <<
" : " << strerror(errno);
1078 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1079 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to open output file (on retry) -: " 1080 << jsonDestPath <<
" : " << strerror(errno);
1085 std::stringstream
ss;
1086 ss <<
"{\"data\":[" << nbEventsWrittenRaw <<
"," << fileSizeFromRaw <<
",\"" << rawSourcePath <<
"\"]}";
1089 if (::
write(outfile, sstr.c_str(), sstr.size()) < 0) {
1090 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1091 <<
" : " << strerror(errno);
1095 if (serverLS && serverLS != lsFromRaw)
1096 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1097 <<
" and raw file header LS " << lsFromRaw;
1099 fileSizeFromHeader = fileSizeFromRaw;
1100 return nbEventsWrittenRaw;
1105 int64_t& fileSizeFromJson,
1110 std::ostringstream fileNameWithPID;
1112 << std::setw(5) <<
pid_ <<
".jsn";
1117 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
1121 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1122 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file -: " << jsonSourcePath <<
" : " 1124 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1125 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file (on retry) -: " 1126 << jsonSourcePath <<
" : " << strerror(errno);
1127 if (errno == ENOENT)
1133 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1134 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1135 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1136 if (errno == EEXIST) {
1137 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - destination file already exists -: " << jsonDestPath
1142 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file -: " << jsonDestPath <<
" : " 1144 struct stat out_stat;
1145 if (
stat(jsonDestPath.c_str(), &out_stat) == 0) {
1147 <<
"grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1148 if (unlink(jsonDestPath.c_str()) == -1) {
1150 <<
"grabNextJsonFile - failed to remove -: " << jsonDestPath <<
" : " << strerror(errno);
1153 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1154 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file (on retry) -: " 1155 << jsonDestPath <<
" : " << strerror(errno);
1161 const std::size_t buf_sz = 512;
1162 std::size_t tot_written = 0;
1163 std::unique_ptr<char> buf(
new char[buf_sz]);
1165 ssize_t sz, sz_read = 1, sz_write;
1166 while (sz_read > 0 && (sz_read = ::
read(infile, buf.get(), buf_sz)) > 0) {
1169 assert(sz_read - sz_write > 0);
1170 if ((sz = ::
write(
outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1177 }
while (sz_write < sz_read);
1182 if (tot_written > 0) {
1184 if (unlink(jsonSourcePath.c_str()) == -1) {
1185 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to remove -: " << jsonSourcePath <<
" : " 1190 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to copy json file or file was empty -: " 1199 std::stringstream
ss;
1202 if (tot_written <= buf_sz) {
1203 result = reader.
parse(buf.get(), deserializeRoot);
1207 boost::filesystem::ifstream ij(jsonDestPath);
1209 }
catch (boost::filesystem::filesystem_error
const& ex) {
1210 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1213 result = reader.
parse(ss.str(), deserializeRoot);
1216 if (tot_written <= buf_sz)
1218 edm::LogError(
"EvFDaqDirector") <<
"Failed to deserialize JSON file -: " << jsonDestPath <<
"\nERROR:\n" 1241 <<
"grabNextJsonFile - " 1242 <<
" error reading number of events from BU JSON; No input value. data -: " <<
data;
1248 fileSizeFromJson = -1;
1254 fileSizeFromJson = boost::lexical_cast<
long>(dataSize);
1255 }
catch (boost::bad_lexical_cast
const&) {
1257 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of Bytes from BU JSON. " 1258 <<
"Input value is -: " << dataSize;
1264 return boost::lexical_cast<
int>(
data);
1265 }
catch (boost::bad_lexical_cast
const&
e) {
1266 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of events from BU JSON. " 1267 <<
"Input value is -: " <<
data;
1268 }
catch (std::runtime_error
const& e) {
1270 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - std::runtime_error exception -: " << e.what();
1274 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1277 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1290 std::ostringstream fileNameWithPID;
1291 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid()
1293 jsonDestPath /= fileNameWithPID.str();
1295 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
1298 }
catch (boost::filesystem::filesystem_error
const& ex) {
1300 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1310 }
catch (boost::filesystem::filesystem_error
const& ex) {
1312 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1315 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
1318 boost::filesystem::ifstream ij(jsonDestPath);
1322 std::stringstream
ss;
1324 if (!reader.
parse(ss.str(), deserializeRoot)) {
1325 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1329 throw std::runtime_error(
"Cannot deserialize input JSON file");
1349 <<
" error reading number of events from BU JSON -: No input value " <<
data;
1351 return boost::lexical_cast<
int>(
data);
1352 }
catch (boost::filesystem::filesystem_error
const& ex) {
1355 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1356 }
catch (std::runtime_error
const&
e) {
1359 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile runtime Exception -: " << e.what();
1360 }
catch (boost::bad_lexical_cast
const&) {
1361 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile error parsing number of events from BU JSON. " 1362 <<
"Input value is -: " <<
data;
1366 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1375 uint32_t& closedServerLS,
1381 serverError =
false;
1383 boost::system::error_code
ec;
1397 boost::asio::streambuf request;
1398 std::ostream request_stream(&request);
1401 std::stringstream spath;
1402 spath << path <<
"&stopls=" << maxLS;
1406 request_stream <<
"GET " << path <<
" HTTP/1.1\r\n";
1408 request_stream <<
"Accept: */*\r\n";
1409 request_stream <<
"Connection: keep-alive\r\n\r\n";
1414 edm::LogInfo(
"EvFDaqDirector") <<
"reconnecting socket on received connection_reset";
1429 boost::asio::streambuf response;
1430 boost::asio::read_until(*
socket_, response,
"\r\n", ec);
1437 std::istream response_stream(&response);
1440 response_stream >> http_version;
1442 response_stream >> serverHttpStatus;
1445 std::getline(response_stream, status_message);
1446 if (!response_stream || http_version.substr(0, 5) !=
"HTTP/") {
1451 if (serverHttpStatus != 200) {
1452 edm::LogWarning(
"EvFDaqDirector") <<
"Response returned with status code " << serverHttpStatus;
1459 while (std::getline(response_stream, header) && header !=
"\r") {
1463 std::map<std::string, std::string> serverMap;
1464 while (std::getline(response_stream, fileInfo) && fileInfo !=
"\r") {
1465 auto pos = fileInfo.find(
"=");
1466 if (
pos == std::string::npos)
1468 auto stitle = fileInfo.substr(0,
pos);
1469 auto svalue = fileInfo.substr(
pos + 1);
1470 serverMap[stitle] = svalue;
1474 auto server_version = serverMap.find(
"version");
1475 assert(server_version != serverMap.end());
1477 auto server_run = serverMap.find(
"runnumber");
1478 assert(server_run != serverMap.end());
1481 auto server_state = serverMap.find(
"state");
1482 assert(server_state != serverMap.end());
1484 auto server_eols = serverMap.find(
"lasteols");
1485 assert(server_eols != serverMap.end());
1487 auto server_ls = serverMap.find(
"lumisection");
1489 int version_maj = 1;
1490 int version_min = 0;
1491 int version_rev = 0;
1493 auto* s_ptr = server_version->second.c_str();
1494 if (!server_version->second.empty() && server_version->second[0] ==
'"')
1496 auto res = sscanf(s_ptr,
"%d.%d.%d", &version_maj, &version_min, &version_rev);
1498 res = sscanf(s_ptr,
"%d.%d", &version_maj, &version_min);
1500 res = sscanf(s_ptr,
"%d", &version_maj);
1503 edm::LogWarning(
"EvFDaqDirector") <<
"Can not parse server version " << server_version->second;
1510 if (server_ls != serverMap.end())
1513 serverLS = closedServerLS + 1;
1516 if (s_state ==
"STARTING")
1518 auto server_file = serverMap.find(
"file");
1519 assert(server_file == serverMap.end());
1521 edm::LogInfo(
"EvFDaqDirector") <<
"Got STARTING notification with last EOLS " << closedServerLS;
1522 }
else if (s_state ==
"READY") {
1523 auto server_file = serverMap.find(
"file");
1524 if (server_file == serverMap.end()) {
1526 if (serverLS <= closedServerLS)
1527 serverLS = closedServerLS + 1;
1530 <<
"Got READY notification with last EOLS " << closedServerLS <<
" and no new file";
1534 auto server_fileprefix = serverMap.find(
"fileprefix");
1536 if (server_fileprefix != serverMap.end()) {
1537 auto pssize = server_fileprefix->second.size();
1538 if (pssize > 1 && server_fileprefix->second[0] ==
'"' && server_fileprefix->second[pssize - 1] ==
'"')
1539 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1541 fileprefix = server_fileprefix->second;
1545 auto ssize = server_file->second.size();
1546 if (ssize > 1 && server_file->second[0] ==
'"' && server_file->second[ssize - 1] ==
'"')
1547 filestem = server_file->second.substr(1, ssize - 2);
1549 filestem = server_file->second;
1550 assert(!filestem.empty());
1551 if (version_maj > 1) {
1552 nextFileRaw =
bu_run_dir_ +
"/" + fileprefix + filestem +
".raw";
1553 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1557 nextFileRaw =
bu_run_dir_ +
"/" + filestem +
".raw";
1558 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1559 nextFileJson = filestem +
".jsn";
1563 edm::LogInfo(
"EvFDaqDirector") <<
"Got READY notification with last EOLS " << closedServerLS <<
" new LS " 1564 << serverLS <<
" file:" << filestem;
1566 }
else if (s_state ==
"EOLS") {
1567 serverLS = closedServerLS + 1;
1568 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOLS notification with last EOLS " << closedServerLS;
1570 }
else if (s_state ==
"EOR") {
1572 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOR notification with last EOLS " << closedServerLS;
1574 }
else if (s_state ==
"NORUN") {
1575 auto err_msg = serverMap.find(
"errormessage");
1576 if (err_msg != serverMap.end())
1577 edm::LogWarning(
"EvFDaqDirector") <<
"Server NORUN -:" << server_state->second <<
" : " << err_msg->second;
1582 }
else if (s_state ==
"ERROR") {
1583 auto err_msg = serverMap.find(
"errormessage");
1584 if (err_msg != serverMap.end())
1585 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second <<
" : " << err_msg->second;
1587 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second;
1591 edm::LogWarning(
"EvFDaqDirector") <<
"Unknown Server state -:" << server_state->second;
1600 if (ec != boost::asio::error::eof) {
1613 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1639 uint16_t& rawHeaderSize,
1640 int32_t& serverEventsInNewFile,
1641 int64_t& fileSizeFromMetadata,
1650 int stopFileLS = -1;
1653 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1654 if (stopFileCheck == 0)
1660 if (stopFileLS >= 0 && (
int)ls >= stopFileLS)
1664 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: " 1678 timeval ts_lockbegin;
1679 gettimeofday(&ts_lockbegin,
nullptr);
1682 uint32_t serverLS, closedServerLS;
1683 unsigned int serverHttpStatus;
1690 int maxLS = stopFileLS < 0 ? -1 :
std::max(stopFileLS, (
int)currentLumiSection);
1691 bool rawHeader =
false;
1693 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1703 if (currentLumiSection == 0) {
1711 if (closedServerLS >= currentLumiSection) {
1712 for (uint32_t
i =
std::max(currentLumiSection, 1
U);
i <= closedServerLS;
i++)
1717 bool fileFound =
true;
1721 serverEventsInNewFile =
1722 grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS);
1724 serverEventsInNewFile =
grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1727 if (serverEventsInNewFile < 0 && rawFd != -1) {
1746 ls =
std::max(currentLumiSection, serverLS);
1747 else if (fileStatus ==
newFile) {
1748 assert(serverLS >= ls);
1750 }
else if (fileStatus ==
noFile) {
1754 edm::LogWarning(
"EvFDaqDirector") <<
"Server reported LS " << serverLS
1755 <<
" which is smaller than currently open LS " << ls <<
". Ignoring response";
1767 if (!boost::filesystem::is_directory(openPath)) {
1768 LogDebug(
"EvFDaqDirector") <<
"<open> FU dir not found. Creating... -:" << openPath.string();
1769 boost::filesystem::create_directories(openPath);
1774 boost::filesystem::ifstream ij(file);
1778 if (!reader.
parse(ij, deserializeRoot)) {
1779 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" <<
file;
1783 int ret = deserializeRoot.
get(
"lastLS",
"").
asInt();
1793 std::stringstream
ss;
1794 ss << fileprefix << std::setfill(
'0') << std::setw(4) << lscount <<
"_EoLS.jsn";
1795 fullpath = ss.str();
1797 }
while (
stat(fullpath.c_str(), &buf) == 0);
1812 std::vector<std::string>
destinations = tsPset.getParameter<std::vector<std::string>>(
"destinations");
1813 for (
auto&
dest : destinations)
1814 destinationsVal.append(
dest);
1815 (*transferSystemJson_)[
"destinations"] = destinationsVal;
1818 std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>(
"transferModes");
1819 for (
auto&
mode : modes)
1821 (*transferSystemJson_)[
"transferModes"] = modesVal;
1823 for (
auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1824 if (psKeyItr->first !=
"destinations" && psKeyItr->first !=
"transferModes") {
1827 for (
auto&
mode : modes) {
1829 if (!streamDef.
existsAs<std::vector<std::string>>(
mode,
true))
1831 <<
" Missing transfer system specification for -:" << psKeyItr->first <<
" (transferMode " <<
mode 1833 std::vector<std::string> streamDestinations = streamDef.
getParameter<std::vector<std::string>>(
mode);
1837 if (streamDestinations.empty())
1839 <<
" Missing transter system destination(s) for -: " << psKeyItr->first <<
", mode:" <<
mode;
1841 for (
auto& sdest : streamDestinations) {
1842 bool sDestValid =
false;
1843 sDestsValue.append(sdest);
1844 for (
auto&
dest : destinations) {
1850 <<
" Invalid transter system destination specified for -: " << psKeyItr->first <<
", mode:" <<
mode 1851 <<
", dest:" << sdest;
1853 streamVal[
mode] = sDestsValue;
1855 (*transferSystemJson_)[psKeyItr->first] = streamVal;
1860 throw cms::Exception(
"EvFDaqDirector") <<
"transferSystem PSet not found";
1867 streamRequestName = stream;
1869 std::stringstream
msg;
1870 msg <<
"Transfer system mode definitions missing for -: " << stream;
1874 edm::LogWarning(
"EvFDaqDirector") << msg.str() <<
" (permissive mode)";
1881 <<
"Selected mode string is not provided as DaqDirector parameter." 1882 <<
"Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1886 throw cms::Exception(
"EvFDaqDirector") <<
"Selected mode string is not provided as DaqDirector parameter.";
1890 std::stringstream
msg;
1891 msg <<
"Selected transfer mode " <<
selectedTransferMode_ <<
" is not specified for stream " << streamRequestName;
1895 edm::LogWarning(
"EvFDaqDirector") << msg.str() <<
" (permissive mode)";
1905 ret += (*it).asString();
1920 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1922 ac->second = streamType;
1929 tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
1931 return search_ac->second;
1933 edm::LogInfo(
"EvFDaqDirector") <<
" No merging type specified for stream " << stream <<
". Using default value";
1935 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1937 ac->second = defaultName;
1944 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1945 close(proc_flag_fd);
unsigned int maxNumberOfThreads() const
T getParameter(std::string const &) const
std::string getStreamDestinations(std::string const &stream) const
Value get(UInt index, const Value &defaultValue) const
std::vector< std::string > & getData()
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPreallocate(Preallocate::slot_type const &iSlot)
const_iterator begin() const
bool existsAs(std::string const ¶meterName, bool trackiness=true) const
checks if a parameter exists as a given type
boost::asio::io_service io_service_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
jsoncollector::DataPointDefinition * dpd_
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
ret
prodAgent to be discontinued
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
ParameterSet const & getParameterSet(ParameterSetID const &id)
pthread_mutex_t init_lock_
LuminosityBlockID const & luminosityBlockID() const
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Value & append(const Value &value)
Append value to array at the end.
void createProcessingNotificationMaybe() const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
bool fileBrokerHostFromCfg_
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
ParameterSetID const & parameterSetID() const
void createRunOpendirMaybe()
unsigned int maxNumberOfStreams() const
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void setComment(std::string const &value)
std::mutex * fileDeleteLockPtr_
std::string mergeTypePset_
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
void tryInitializeFuLockFile()
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void openFULockfileStream(bool create)
int getNFilesFromEoLS(std::string BUEoLSFile)
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int getLumisectionToStart() const
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
void checkMergeTypePSet(edm::ProcessContext const &pc)
ParameterSet const & getParameterSet(std::string const &) const
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void deserialize(Json::Value &root) override
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
LuminosityBlockNumber_t luminosityBlock() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::string & getDefinition()
def remove(d, key, TELL=False)
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string findHighestRunDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
bool fileBrokerUseLocalLock_
void postEndRun(edm::GlobalContext const &globalContext)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS)
Unserialize a JSON document into a Value.
static const std::vector< std::string > MergeTypeNames_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
const_iterator end() const
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Iterator for object and array value.
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
bool fileBrokerKeepAlive_
std::string getRunOpenDirPath() const
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
std::string getFFFParamsFilePathOnBU() const
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string getEoRFilePathOnFU() const
std::string bu_run_open_dir_
array value (ordered list)