25 #include <boost/lexical_cast.hpp>
26 #include <boost/algorithm/string.hpp>
37 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {
"",
"DAT",
"PB",
"JSNDATA"};
40 : base_dir_(
pset.getUntrackedParameter<
std::
string>(
"baseDir")),
41 bu_base_dir_(
pset.getUntrackedParameter<
std::
string>(
"buBaseDir")),
42 run_(
pset.getUntrackedParameter<unsigned
int>(
"runNumber")),
43 useFileBroker_(
pset.getUntrackedParameter<
bool>(
"useFileBroker")),
44 fileBrokerHostFromCfg_(
pset.getUntrackedParameter<
bool>(
"fileBrokerHostFromCfg",
true)),
45 fileBrokerHost_(
pset.getUntrackedParameter<
std::
string>(
"fileBrokerHost",
"InValid")),
46 fileBrokerPort_(
pset.getUntrackedParameter<
std::
string>(
"fileBrokerPort",
"8080")),
47 fileBrokerKeepAlive_(
pset.getUntrackedParameter<
bool>(
"fileBrokerKeepAlive",
true)),
48 fileBrokerUseLocalLock_(
pset.getUntrackedParameter<
bool>(
"fileBrokerUseLocalLock",
true)),
49 fuLockPollInterval_(
pset.getUntrackedParameter<unsigned
int>(
"fuLockPollInterval", 2000)),
50 outputAdler32Recheck_(
pset.getUntrackedParameter<
bool>(
"outputAdler32Recheck",
false)),
51 requireTSPSet_(
pset.getUntrackedParameter<
bool>(
"requireTransfersPSet",
false)),
52 selectedTransferMode_(
pset.getUntrackedParameter<
std::
string>(
"selectedTransferMode",
"")),
53 mergeTypePset_(
pset.getUntrackedParameter<
std::
string>(
"mergingPset",
"")),
54 directorBU_(
pset.getUntrackedParameter<
bool>(
"directorIsBU",
false)),
55 hltSourceDirectory_(
pset.getUntrackedParameter<
std::
string>(
"hltSourceDirectory",
"")),
59 fu_readwritelock_fd_(-1),
60 fulocal_rwlock_fd_(-1),
61 fulocal_rwlock_fd2_(-1),
62 bu_w_lock_stream(nullptr),
63 bu_r_lock_stream(nullptr),
64 fu_rw_lock_stream(nullptr),
65 dirManager_(base_dir_),
67 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
68 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
69 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
72 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
81 gethostname(hostname, 32);
84 char* fuLockPollIntervalPtr = std::getenv(
"FFF_LOCKPOLLINTERVAL");
85 if (fuLockPollIntervalPtr) {
90 }
catch (boost::bad_lexical_cast
const&) {
96 char* fileBrokerParamPtr = std::getenv(
"FFF_USEFILEBROKER");
97 if (fileBrokerParamPtr) {
101 }
catch (boost::bad_lexical_cast
const&) {
110 if (
stat(
"/etc/appliance/bus.config", &
buf) == 0) {
115 throw cms::Exception(
"EvFDaqDirector") <<
"No file service or BU data address information";
118 <<
"fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
126 char* startFromLSPtr = std::getenv(
"FFF_STARTFROMLS");
127 if (startFromLSPtr) {
131 }
catch (boost::bad_lexical_cast
const&) {
137 char* fileBrokerUseLockParamPtr = std::getenv(
"FFF_FILEBROKERUSELOCALLOCK");
138 if (fileBrokerUseLockParamPtr) {
141 edm::LogInfo(
"EvFDaqDirector") <<
"Setting fileBrokerUseLocalLock parameter by environment string: "
143 }
catch (boost::bad_lexical_cast
const&) {
150 std::stringstream
ss;
151 ss <<
"run" << std::setfill(
'0') << std::setw(6) <<
run_;
153 ss = std::stringstream();
158 ss = std::stringstream();
163 int retval =
mkdir(
base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164 if (retval != 0 && errno != EEXIST) {
166 <<
" Error checking for base dir -: " <<
base_dir_ <<
" mkdir error:" << strerror(errno);
171 retval =
mkdir(
run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172 if (retval != 0 && errno != EEXIST) {
174 <<
" Error creating run dir -: " <<
run_dir_ <<
" mkdir error:" << strerror(errno);
182 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
185 <<
" Error creating/opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
186 chmod(fulocal_lock_.c_str(), 0777);
190 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
193 <<
" Error opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
205 if (retval != 0 && errno != EEXIST) {
207 <<
" Error creating bu run dir -: " <<
bu_run_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
211 if (retval != 0 && errno != EEXIST) {
213 <<
" Error creating bu run open dir -: " <<
bu_run_open_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
217 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
219 edm::LogWarning(
"EvFDaqDirector") <<
"problem with creating filedesc for buwritelock -: " << strerror(errno);
224 edm::LogWarning(
"EvFDaqDirector") <<
"Error creating write lock stream -: " << strerror(errno);
238 retval =
mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239 if (retval != 0 && errno != EEXIST)
241 <<
" Error creating bu run dir -: " << hltdir <<
" mkdir error:" << strerror(errno) <<
"\n";
243 std::filesystem::copy_file(
hltSourceDirectory_ +
"/HltConfig.py", tmphltdir +
"/HltConfig.py");
244 std::filesystem::copy_file(
hltSourceDirectory_ +
"/fffParameters.jsn", tmphltdir +
"/fffParameters.jsn");
246 std::string optfiles[3] = {
"hltinfo",
"blacklist",
"whitelist"};
247 for (
auto& optfile : optfiles) {
249 std::filesystem::copy_file(
hltSourceDirectory_ +
"/" + optfile, tmphltdir +
"/" + optfile);
254 std::filesystem::rename(tmphltdir, hltdir);
263 if (retval != 0 && errno != EEXIST) {
265 <<
" Error checking for bu base dir -: " <<
bu_base_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
276 std::stringstream sstp;
283 if (!
stat(defPath.c_str(), &statbuf))
284 edm::LogInfo(
"EvFDaqDirector") <<
"found JSD file in ramdisk -: " << defPath;
287 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
288 defPath =
std::string(std::getenv(
"CMSSW_BASE")) +
"/" + defPathSuffix;
289 if (
stat(defPath.c_str(), &statbuf)) {
290 defPath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + defPathSuffix;
291 if (
stat(defPath.c_str(), &statbuf)) {
292 defPath = defPathSuffix;
298 DataPointDefinition::getDataPointDefinitionFor(defPath,
dpd_, &defLabel);
305 boost::system::error_code ec;
306 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
331 "Service used for file locking arbitration and for propagating information between other EvF components");
332 desc.addUntracked<
std::string>(
"baseDir",
".")->setComment(
"Local base directory for run output");
333 desc.addUntracked<
std::string>(
"buBaseDir",
".")->setComment(
"BU base ramdisk directory ");
334 desc.addUntracked<
unsigned int>(
"runNumber", 0)->setComment(
"Run Number in ramdisk to open");
335 desc.addUntracked<
bool>(
"useFileBroker",
false)
336 ->setComment(
"Use BU file service to grab input data instead of NFS file locking");
337 desc.addUntracked<
bool>(
"fileBrokerHostFromCfg",
true)
338 ->setComment(
"Allow service to discover BU address from hltd configuration");
339 desc.addUntracked<
std::string>(
"fileBrokerHost",
"InValid")->setComment(
"BU file service host.");
340 desc.addUntracked<
std::string>(
"fileBrokerPort",
"8080")->setComment(
"BU file service port");
341 desc.addUntracked<
bool>(
"fileBrokerKeepAlive",
true)
342 ->setComment(
"Use keep alive to avoid using large number of sockets");
343 desc.addUntracked<
bool>(
"fileBrokerUseLocalLock",
true)
344 ->setComment(
"Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
345 desc.addUntracked<
unsigned int>(
"fuLockPollInterval", 2000)
346 ->setComment(
"Lock polling interval in microseconds for the input directory file lock");
347 desc.addUntracked<
bool>(
"outputAdler32Recheck",
false)
348 ->setComment(
"Check Adler32 of per-process output files while micro-merging");
349 desc.addUntracked<
bool>(
"requireTransfersPSet",
false)
350 ->setComment(
"Require complete transferSystem PSet in the process configuration");
352 ->setComment(
"Selected transfer mode (choice in Lvl0 propagated as Python parameter");
353 desc.addUntracked<
bool>(
"directorIsBU",
false)->setComment(
"BU director mode used for testing");
354 desc.addUntracked<
std::string>(
"hltSourceDirectory",
"")->setComment(
"BU director mode source directory");
356 ->setComment(
"Name of merging PSet to look for merging type definitions for streams");
357 descriptions.
add(
"EvFDaqDirector",
desc);
388 edm::LogWarning(
"EvFDaqDirector") <<
" Handles to check for files to delete were not set by the input source...";
395 if (it->second->lumi_ ==
ls) {
499 <<
". error = " << strerror(errno);
507 uint16_t& rawHeaderSize,
509 bool& setExceptionState) {
514 int lock_attempts = 0;
515 long total_lock_attempts = 0;
521 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
522 if (stopFileCheck == 0)
528 if (stopFileLS >= 0 && (
int)
ls >= stopFileLS)
532 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: "
538 timeval ts_lockbegin;
539 gettimeofday(&ts_lockbegin,
nullptr);
541 while (retval == -1) {
550 if (lock_attempts > 5000000 || errno == 116) {
553 <<
"Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
555 edm::LogWarning(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 seconds. Checking if run directory and "
556 "fu.lock file are present -: errno "
557 << errno <<
":" << strerror(errno) << std::endl;
560 edm::LogWarning(
"EvFDaqDirector") <<
"Detected local EoLS for lumisection " <<
ls;
572 if (total_lock_attempts > 5 * 60000000) {
573 edm::LogError(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 minutes. Stopping polling activity.";
579 gettimeofday(&ts_lockend,
nullptr);
580 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
582 lockWaitTime = deltat;
589 gettimeofday(&ts_lockend, 0);
593 int fu_readwritelock_fd2 = open(
fulockfile_.c_str(), O_RDWR, S_IRWXU);
594 if (fu_readwritelock_fd2 == -1)
596 <<
" create. error:" << strerror(errno);
598 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2,
"r+");
601 if (fu_rw_lock_stream2 !=
nullptr) {
605 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
609 fscanf(fu_rw_lock_stream2,
"%u %u", &readLs, &
readIndex);
612 unsigned int currentLs = readLs;
613 bool bumpedOk =
false;
616 if (
ls &&
ls + 1 < currentLs)
620 bumpedOk =
bumpFile(readLs,
readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
622 if (
ls && readLs > currentLs && currentLs >
ls) {
624 readLs = currentLs =
ls;
629 if (
ls == 0 && readLs > currentLs) {
644 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
646 ftruncate(fu_readwritelock_fd2, 0);
648 fprintf(fu_rw_lock_stream2,
"%u %u", readLs,
readIndex + 1);
649 fflush(fu_rw_lock_stream2);
650 fsync(fu_readwritelock_fd2);
652 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" <<
readIndex + 1;
655 <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
656 setExceptionState =
true;
659 }
else if (currentLs < readLs) {
661 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
663 ftruncate(fu_readwritelock_fd2, 0);
665 fprintf(fu_rw_lock_stream2,
"%u %u", readLs,
readIndex);
666 fflush(fu_rw_lock_stream2);
667 fsync(fu_readwritelock_fd2);
668 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" <<
readIndex;
671 <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
672 setExceptionState =
true;
677 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for reading failed with error "
681 edm::LogError(
"EvFDaqDirector") <<
"fu read/write lock stream is invalid " << strerror(errno);
683 fclose(fu_rw_lock_stream2);
686 timeval ts_preunlock;
687 gettimeofday(&ts_preunlock, 0);
688 int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
689 double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
700 edm::LogError(
"EvFDaqDirector") <<
"Error unlocking the fu.lock " << strerror(errno);
703 edm::LogDebug(
"EvFDaqDirector") <<
"Waited during lock -: " << locked_period <<
" seconds";
706 if (fileStatus ==
noFile) {
711 if (stopFileLS >= 0 && (
int)
ls > stopFileLS) {
712 edm::LogInfo(
"EvFDaqDirector") <<
"Reached maximum lumisection set by hltd";
720 std::ifstream ij(BUEoLSFile);
724 if (!
reader.parse(ij, deserializeRoot)) {
725 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" << BUEoLSFile;
731 dp.deserialize(deserializeRoot);
739 while (!
def.empty()) {
741 if (
def.find(
'/') == 0)
749 DataPointDefinition::getDataPointDefinitionFor(
fullpath, &eolsDpd, &defLabel);
754 DataPointDefinition::getDataPointDefinitionFor(
fullpath, &eolsDpd, &defLabel);
756 for (
unsigned int i = 0;
i < eolsDpd.
getNames().size();
i++)
757 if (eolsDpd.
getNames().at(
i) ==
"NFiles")
763 if (
def.size() <= 1 ||
def.find(
'/') == std::string::npos) {
774 edm::LogError(
"EvFDaqDirector") <<
" error reading number of files from BU JSON -: " << BUEoLSFile;
777 return boost::lexical_cast<int>(
data);
784 uint16_t& rawHeaderSize,
786 bool& setExceptionState) {
798 if (maxLS >= 0 &&
ls > (
unsigned int)maxLS)
802 std::stringstream
ss;
803 unsigned int nextIndex =
index;
808 if (
stat(nextFileJson.c_str(), &
buf) == 0) {
810 nextFile = nextFileJson;
819 nextFile = nextFileRaw;
825 if (
stat(BUEoLSFile.c_str(), &
buf) == 0) {
827 if (
stat(nextFileJson.c_str(), &
buf) == 0) {
829 nextFile = nextFileJson;
834 nextFile = nextFileRaw;
839 if (indexFilesInLS < 0)
844 if ((
int)
index < indexFilesInLS) {
847 <<
"Potential miss of index file in LS -: " <<
ls <<
". Missing " << nextFile <<
" because "
848 << indexFilesInLS - 1 <<
" is the highest index expected. Will not update fu.lock file";
849 setExceptionState =
true;
858 if (maxLS >= 0 &&
ls > (
unsigned int)maxLS)
863 if (
stat(nextFileJson.c_str(), &
buf) == 0) {
866 nextFile = nextFileJson;
871 nextFile = nextFileRaw;
883 edm::LogError(
"EvFDaqDirector") <<
"Error creating fu read/write lock stream " << strerror(errno);
885 edm::LogInfo(
"EvFDaqDirector") <<
"Initializing FU LOCK FILE";
894 open(
fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
901 <<
" create:" <<
create <<
" error:" << strerror(errno);
907 edm::LogError(
"EvFDaqDirector") <<
"problem with opening fuwritelock file stream -: " << strerror(errno);
938 if (checkIfExists ==
false ||
stat(fuBoLS.c_str(), &
buf) != 0) {
939 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
945 const uint32_t currentLumiSection,
948 if (currentLumiSection > 0) {
955 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
961 }
else if (doCreateBoLS) {
968 uint16_t& rawHeaderSize,
969 uint32_t& lsFromHeader,
970 int32_t& eventsFromHeader,
971 int64_t& fileSizeFromHeader,
977 if ((
infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
980 <<
"parseFRDFileHeader - failed to open input file -: " << rawSourcePath <<
" : " << strerror(errno);
991 if ((
infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
993 <<
"parseFRDFileHeader - failed to open input file -: " << rawSourcePath <<
" : " << strerror(errno);
1005 ssize_t sz_read =
::read(
infile, (
char*)&fileHead, buf_sz);
1012 edm::LogError(
"EvFDaqDirector") <<
"parseFRDFileHeader - unable to read " << rawSourcePath <<
" : "
1018 if ((
size_t)sz_read < buf_sz) {
1019 edm::LogError(
"EvFDaqDirector") <<
"parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1027 if (frd_version == 0) {
1029 if (requireHeader) {
1030 edm::LogError(
"EvFDaqDirector") <<
"no header or invalid version string found in:" << rawSourcePath;
1036 lseek(
infile, 0, SEEK_SET);
1039 eventsFromHeader = -1;
1040 fileSizeFromHeader = -1;
1044 uint32_t headerSizeRaw = fileHead.headerSize_;
1045 if (headerSizeRaw < buf_sz) {
1046 edm::LogError(
"EvFDaqDirector") <<
"inconsistent header size: " << rawSourcePath <<
" size: " << headerSizeRaw
1047 <<
" v:" << frd_version;
1053 lsFromHeader = fileHead.lumiSection_;
1054 eventsFromHeader = (int32_t)fileHead.eventCount_;
1055 fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1056 rawHeaderSize = fileHead.headerSize_;
1064 if ((
infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1065 edm::LogWarning(
"EvFDaqDirector") <<
"rawFileHasHeader - failed to open input file -: " << rawSourcePath <<
" : "
1072 ssize_t sz_read =
::read(
infile, (
char*)&fileHead, buf_sz);
1075 edm::LogError(
"EvFDaqDirector") <<
"rawFileHasHeader - unable to read " << rawSourcePath <<
" : "
1081 if ((
size_t)sz_read < buf_sz) {
1082 edm::LogError(
"EvFDaqDirector") <<
"rawFileHasHeader - file smaller than header: " << rawSourcePath;
1092 if (frd_version > 0) {
1093 rawHeaderSize = fileHead.headerSize_;
1103 uint16_t& rawHeaderSize,
1104 int64_t& fileSizeFromHeader,
1112 size_t pos = 0, n_tokens = 0;
1113 while (n_tokens++ < 3 && (
pos = jsonStem.find(
'_',
pos + 1)) != std::string::npos) {
1117 std::ostringstream fileNameWithPID;
1119 fileNameWithPID << reducedJsonStem <<
"_pid" << std::setfill(
'0') << std::setw(5) <<
pid_ <<
".jsn";
1123 LogDebug(
"EvFDaqDirector") <<
"RAW parse -: " << rawSourcePath <<
" and JSON create " << jsonDestPath;
1127 int32_t nbEventsWrittenRaw;
1128 int64_t fileSizeFromRaw;
1130 rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw,
true,
true, closeFile);
1138 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1139 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1140 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1141 if (errno == EEXIST) {
1142 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1146 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath <<
" : "
1148 struct stat out_stat;
1149 if (
stat(jsonDestPath.c_str(), &out_stat) == 0) {
1151 <<
"grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1153 if (unlink(jsonDestPath.c_str()) == -1) {
1155 <<
"grabNextJsonFromRaw - failed to remove -: " << jsonDestPath <<
" : " << strerror(errno);
1158 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1159 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to open output file (on retry) -: "
1160 << jsonDestPath <<
" : " << strerror(errno);
1165 std::stringstream
ss;
1166 ss <<
"{\"data\":[" << nbEventsWrittenRaw <<
"," << fileSizeFromRaw <<
",\"" << rawSourcePath <<
"\"]}";
1170 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1171 <<
" : " << strerror(errno);
1175 if (serverLS && serverLS != lsFromRaw)
1176 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1177 <<
" and raw file header LS " << lsFromRaw;
1179 fileSizeFromHeader = fileSizeFromRaw;
1180 return nbEventsWrittenRaw;
1185 int64_t& fileSizeFromJson,
1190 std::ostringstream fileNameWithPID;
1191 fileNameWithPID <<
std::filesystem::path(rawSourcePath).stem().string() <<
"_pid" << std::setfill(
'0')
1192 << std::setw(5) <<
pid_ <<
".jsn";
1197 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
1201 if ((
infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1202 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file -: " << jsonSourcePath <<
" : "
1204 if ((
infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1205 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file (on retry) -: "
1206 << jsonSourcePath <<
" : " << strerror(errno);
1207 if (errno == ENOENT)
1213 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1214 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1215 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1216 if (errno == EEXIST) {
1217 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - destination file already exists -: " << jsonDestPath
1222 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file -: " << jsonDestPath <<
" : "
1224 struct stat out_stat;
1225 if (
stat(jsonDestPath.c_str(), &out_stat) == 0) {
1227 <<
"grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1228 if (unlink(jsonDestPath.c_str()) == -1) {
1230 <<
"grabNextJsonFile - failed to remove -: " << jsonDestPath <<
" : " << strerror(errno);
1233 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1234 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file (on retry) -: "
1235 << jsonDestPath <<
" : " << strerror(errno);
1241 const std::size_t buf_sz = 512;
1242 std::size_t tot_written = 0;
1243 std::unique_ptr<char>
buf(
new char[buf_sz]);
1245 ssize_t sz, sz_read = 1, sz_write;
1246 while (sz_read > 0 && (sz_read = ::
read(
infile,
buf.get(), buf_sz)) > 0) {
1249 assert(sz_read - sz_write > 0);
1250 if ((sz = ::
write(
outfile,
buf.get() + sz_write, sz_read - sz_write)) < 0) {
1257 }
while (sz_write < sz_read);
1262 if (tot_written > 0) {
1264 if (unlink(jsonSourcePath.c_str()) == -1) {
1265 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to remove -: " << jsonSourcePath <<
" : "
1270 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to copy json file or file was empty -: "
1279 std::stringstream
ss;
1282 if (tot_written <= buf_sz) {
1287 std::ifstream ij(jsonDestPath);
1289 }
catch (std::filesystem::filesystem_error
const& ex) {
1290 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1296 if (tot_written <= buf_sz)
1298 edm::LogError(
"EvFDaqDirector") <<
"Failed to deserialize JSON file -: " << jsonDestPath <<
"\nERROR:\n"
1299 <<
reader.getFormatedErrorMessages() <<
"CONTENT:\n"
1306 dp.deserialize(deserializeRoot);
1310 if (
i <
dp.getData().size()) {
1317 if (!
dp.getData().empty())
1321 <<
"grabNextJsonFile - "
1322 <<
" error reading number of events from BU JSON; No input value. data -: " <<
data;
1328 fileSizeFromJson = -1;
1331 if (
i <
dp.getData().size()) {
1334 fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1335 }
catch (boost::bad_lexical_cast
const&) {
1337 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1338 <<
"Input value is -: " << dataSize;
1344 return boost::lexical_cast<int>(
data);
1345 }
catch (boost::bad_lexical_cast
const&
e) {
1346 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
1347 <<
"Input value is -: " <<
data;
1348 }
catch (std::runtime_error
const&
e) {
1350 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - std::runtime_error exception -: " <<
e.what();
1354 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " <<
e.what();
1357 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1370 std::ostringstream fileNameWithPID;
1371 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid()
1373 jsonDestPath /= fileNameWithPID.str();
1375 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
1378 }
catch (std::filesystem::filesystem_error
const& ex) {
1380 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1390 }
catch (std::filesystem::filesystem_error
const& ex) {
1392 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1395 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
1398 std::ifstream ij(jsonDestPath);
1402 std::stringstream
ss;
1404 if (!
reader.parse(
ss.str(), deserializeRoot)) {
1405 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1407 <<
reader.getFormatedErrorMessages() <<
"CONTENT:\n"
1409 throw std::runtime_error(
"Cannot deserialize input JSON file");
1415 dp.deserialize(deserializeRoot);
1419 if (
i <
dp.getData().size()) {
1425 if (!
dp.getData().empty())
1429 <<
" error reading number of events from BU JSON -: No input value " <<
data;
1431 return boost::lexical_cast<int>(
data);
1432 }
catch (std::filesystem::filesystem_error
const& ex) {
1435 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1436 }
catch (std::runtime_error
const&
e) {
1439 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile runtime Exception -: " <<
e.what();
1440 }
catch (boost::bad_lexical_cast
const&) {
1441 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile error parsing number of events from BU JSON. "
1442 <<
"Input value is -: " <<
data;
1446 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " <<
e.what();
1455 uint32_t& closedServerLS,
1461 serverError =
false;
1463 boost::system::error_code ec;
1471 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::connect error -:" << ec;
1477 boost::asio::streambuf request;
1478 std::ostream request_stream(&request);
1481 std::stringstream spath;
1482 spath <<
path <<
"&stopls=" << maxLS;
1486 request_stream <<
"GET " <<
path <<
" HTTP/1.1\r\n";
1488 request_stream <<
"Accept: */*\r\n";
1489 request_stream <<
"Connection: keep-alive\r\n\r\n";
1494 edm::LogInfo(
"EvFDaqDirector") <<
"reconnecting socket on received connection_reset";
1498 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::connect error -:" << ec;
1504 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::write error -:" << ec;
1509 boost::asio::streambuf response;
1510 boost::asio::read_until(*
socket_, response,
"\r\n", ec);
1512 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::read_until error -:" << ec;
1517 std::istream response_stream(&response);
1520 response_stream >> http_version;
1522 response_stream >> serverHttpStatus;
1525 std::getline(response_stream, status_message);
1526 if (!response_stream || http_version.substr(0, 5) !=
"HTTP/") {
1531 if (serverHttpStatus != 200) {
1532 edm::LogWarning(
"EvFDaqDirector") <<
"Response returned with status code " << serverHttpStatus;
1539 while (std::getline(response_stream,
header) &&
header !=
"\r") {
1543 std::map<std::string, std::string> serverMap;
1544 while (std::getline(response_stream, fileInfo) && fileInfo !=
"\r") {
1545 auto pos = fileInfo.find(
'=');
1546 if (
pos == std::string::npos)
1548 auto stitle = fileInfo.substr(0,
pos);
1549 auto svalue = fileInfo.substr(
pos + 1);
1550 serverMap[stitle] = svalue;
1554 auto server_version = serverMap.find(
"version");
1555 assert(server_version != serverMap.end());
1557 auto server_run = serverMap.find(
"runnumber");
1558 assert(server_run != serverMap.end());
1561 auto server_state = serverMap.find(
"state");
1562 assert(server_state != serverMap.end());
1564 auto server_eols = serverMap.find(
"lasteols");
1565 assert(server_eols != serverMap.end());
1567 auto server_ls = serverMap.find(
"lumisection");
1569 int version_maj = 1;
1570 int version_min = 0;
1571 int version_rev = 0;
1573 auto* s_ptr = server_version->second.c_str();
1574 if (!server_version->second.empty() && server_version->second[0] ==
'"')
1576 auto res = sscanf(s_ptr,
"%d.%d.%d", &version_maj, &version_min, &version_rev);
1578 res = sscanf(s_ptr,
"%d.%d", &version_maj, &version_min);
1580 res = sscanf(s_ptr,
"%d", &version_maj);
1583 edm::LogWarning(
"EvFDaqDirector") <<
"Can not parse server version " << server_version->second;
1590 if (server_ls != serverMap.end())
1593 serverLS = closedServerLS + 1;
1596 if (s_state ==
"STARTING")
1598 auto server_file = serverMap.find(
"file");
1599 assert(server_file == serverMap.end());
1601 edm::LogInfo(
"EvFDaqDirector") <<
"Got STARTING notification with last EOLS " << closedServerLS;
1602 }
else if (s_state ==
"READY") {
1603 auto server_file = serverMap.find(
"file");
1604 if (server_file == serverMap.end()) {
1606 if (serverLS <= closedServerLS)
1607 serverLS = closedServerLS + 1;
1610 <<
"Got READY notification with last EOLS " << closedServerLS <<
" and no new file";
1614 auto server_fileprefix = serverMap.find(
"fileprefix");
1616 if (server_fileprefix != serverMap.end()) {
1617 auto pssize = server_fileprefix->second.size();
1618 if (pssize > 1 && server_fileprefix->second[0] ==
'"' && server_fileprefix->second[pssize - 1] ==
'"')
1619 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1621 fileprefix = server_fileprefix->second;
1625 auto ssize = server_file->second.size();
1626 if (ssize > 1 && server_file->second[0] ==
'"' && server_file->second[ssize - 1] ==
'"')
1627 filestem = server_file->second.substr(1, ssize - 2);
1629 filestem = server_file->second;
1630 assert(!filestem.empty());
1631 if (version_maj > 1) {
1632 nextFileRaw =
bu_run_dir_ +
"/" + fileprefix + filestem +
".raw";
1633 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1637 nextFileRaw =
bu_run_dir_ +
"/" + filestem +
".raw";
1638 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1639 nextFileJson = filestem +
".jsn";
1643 edm::LogInfo(
"EvFDaqDirector") <<
"Got READY notification with last EOLS " << closedServerLS <<
" new LS "
1644 << serverLS <<
" file:" << filestem;
1646 }
else if (s_state ==
"EOLS") {
1647 serverLS = closedServerLS + 1;
1648 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOLS notification with last EOLS " << closedServerLS;
1650 }
else if (s_state ==
"EOR") {
1652 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOR notification with last EOLS " << closedServerLS;
1654 }
else if (s_state ==
"NORUN") {
1655 auto err_msg = serverMap.find(
"errormessage");
1656 if (err_msg != serverMap.end())
1657 edm::LogWarning(
"EvFDaqDirector") <<
"Server NORUN -:" << server_state->second <<
" : " << err_msg->second;
1662 }
else if (s_state ==
"ERROR") {
1663 auto err_msg = serverMap.find(
"errormessage");
1664 if (err_msg != serverMap.end())
1665 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second <<
" : " << err_msg->second;
1667 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second;
1671 edm::LogWarning(
"EvFDaqDirector") <<
"Unknown Server state -:" << server_state->second;
1680 if (ec != boost::asio::error::eof) {
1681 edm::LogWarning(
"EvFDaqDirector") <<
"boost::asio::read_until error -:" << ec;
1695 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1697 edm::LogWarning(
"EvFDaqDirector") <<
"socket shutdown error -:" << ec;
1722 uint16_t& rawHeaderSize,
1723 int32_t& serverEventsInNewFile,
1724 int64_t& fileSizeFromMetadata,
1733 int stopFileLS = -1;
1736 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1737 if (stopFileCheck == 0)
1743 if (stopFileLS >= 0 && (
int)
ls >= stopFileLS)
1747 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: "
1761 timeval ts_lockbegin;
1762 gettimeofday(&ts_lockbegin,
nullptr);
1765 uint32_t serverLS, closedServerLS;
1766 unsigned int serverHttpStatus;
1773 int maxLS = stopFileLS < 0 ? -1 :
std::max(stopFileLS, (
int)currentLumiSection);
1774 bool rawHeader =
false;
1776 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1786 if (currentLumiSection == 0) {
1792 if (closedServerLS >= currentLumiSection) {
1794 for (uint32_t
i =
std::max(currentLumiSection, 1
U);
i <= closedServerLS;
i++)
1799 bool fileFound =
true;
1803 serverEventsInNewFile =
1804 grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS,
false);
1806 serverEventsInNewFile =
grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1809 if (serverEventsInNewFile < 0 && rawFd != -1) {
1830 if (currentLumiSection == 0) {
1840 if (closedServerLS >= currentLumiSection) {
1843 for (uint32_t
i =
std::max(currentLumiSection, 1
U);
i <= closedServerLS;
i++)
1851 else if (fileStatus ==
newFile) {
1854 }
else if (fileStatus ==
noFile) {
1858 edm::LogWarning(
"EvFDaqDirector") <<
"Server reported LS " << serverLS
1859 <<
" which is smaller than currently open LS " <<
ls <<
". Ignoring response";
1871 if (!std::filesystem::is_directory(openPath)) {
1872 LogDebug(
"EvFDaqDirector") <<
"<open> FU dir not found. Creating... -:" << openPath.string();
1873 std::filesystem::create_directories(openPath);
1878 std::ifstream ij(
file);
1882 if (!
reader.parse(ij, deserializeRoot)) {
1883 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" <<
file;
1887 int ret = deserializeRoot.
get(
"lastLS",
"").
asInt();
1897 std::stringstream
ss;
1898 ss << fileprefix << std::setfill(
'0') << std::setw(4) << lscount <<
"_EoLS.jsn";
1916 std::vector<std::string>
destinations = tsPset.getParameter<std::vector<std::string>>(
"destinations");
1918 destinationsVal.append(
dest);
1919 (*transferSystemJson_)[
"destinations"] = destinationsVal;
1922 std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>(
"transferModes");
1923 for (
auto&
mode : modes)
1925 (*transferSystemJson_)[
"transferModes"] = modesVal;
1927 for (
auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1928 if (psKeyItr->first !=
"destinations" && psKeyItr->first !=
"transferModes") {
1931 for (
auto&
mode : modes) {
1933 if (!streamDef.
existsAs<std::vector<std::string>>(
mode,
true))
1935 <<
" Missing transfer system specification for -:" << psKeyItr->first <<
" (transferMode " <<
mode
1937 std::vector<std::string> streamDestinations = streamDef.
getParameter<std::vector<std::string>>(
mode);
1941 if (streamDestinations.empty())
1943 <<
" Missing transter system destination(s) for -: " << psKeyItr->first <<
", mode:" <<
mode;
1945 for (
auto& sdest : streamDestinations) {
1946 bool sDestValid =
false;
1947 sDestsValue.append(sdest);
1954 <<
" Invalid transter system destination specified for -: " << psKeyItr->first <<
", mode:" <<
mode
1955 <<
", dest:" << sdest;
1957 streamVal[
mode] = sDestsValue;
1959 (*transferSystemJson_)[psKeyItr->first] = streamVal;
1964 throw cms::Exception(
"EvFDaqDirector") <<
"transferSystem PSet not found";
1971 streamRequestName =
stream;
1973 std::stringstream
msg;
1974 msg <<
"Transfer system mode definitions missing for -: " <<
stream;
1985 <<
"Selected mode string is not provided as DaqDirector parameter."
1986 <<
"Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1990 throw cms::Exception(
"EvFDaqDirector") <<
"Selected mode string is not provided as DaqDirector parameter.";
1994 std::stringstream
msg;
2009 ret += (*it).asString();
2024 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2026 ac->second = streamType;
2033 tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2035 return search_ac->second;
2037 edm::LogInfo(
"EvFDaqDirector") <<
" No merging type specified for stream " <<
stream <<
". Using default value";
2039 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2041 ac->second = defaultName;
2048 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2049 close(proc_flag_fd);
2052 struct flock
EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {