23 #include <boost/lexical_cast.hpp> 24 #include <boost/filesystem/fstream.hpp> 25 #include <boost/algorithm/string.hpp> 37 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {
"",
"DAT",
"PB",
"JSNDATA"};
41 base_dir_(pset.getUntrackedParameter<
std::
string> (
"baseDir",
".")),
42 bu_base_dir_(pset.getUntrackedParameter<
std::
string> (
"buBaseDir",
".")),
43 directorBu_(pset.getUntrackedParameter<
bool> (
"directorIsBu",
false)),
44 run_(pset.getUntrackedParameter<unsigned
int> (
"runNumber",0)),
45 useFileBroker_(pset.getUntrackedParameter<
bool> (
"useFileBroker",
false)),
46 fileBrokerHost_(pset.getUntrackedParameter<
std::
string>(
"fileBrokerHost",
"")),
47 fileBrokerPort_(pset.getUntrackedParameter<
std::
string>(
"fileBrokerPort",
"8080")),
48 fileBrokerKeepAlive_(pset.getUntrackedParameter<
bool>(
"fileBrokerKeepAlive",
true)),
49 fileBrokerUseLocalLock_(pset.getUntrackedParameter<
bool>(
"fileBrokerUseLocalLock",
true)),
50 outputAdler32Recheck_(pset.getUntrackedParameter<
bool>(
"outputAdler32Recheck",
false)),
51 requireTSPSet_(pset.getUntrackedParameter<
bool>(
"requireTransfersPSet",
false)),
52 selectedTransferMode_(pset.getUntrackedParameter<
std::
string>(
"selectedTransferMode",
"")),
53 hltSourceDirectory_(pset.getUntrackedParameter<
std::
string>(
"hltSourceDirectory",
"")),
54 fuLockPollInterval_(pset.getUntrackedParameter<unsigned
int>(
"fuLockPollInterval",2000)),
55 mergeTypePset_(pset.getUntrackedParameter<
std::
string>(
"mergeTypePset",
"")),
59 fu_readwritelock_fd_(-1),
60 fulocal_rwlock_fd_(-1),
61 fulocal_rwlock_fd2_(-1),
65 dirManager_(base_dir_),
67 bu_w_flk( make_flock( F_WRLCK, SEEK_SET, 0, 0, 0 )),
68 bu_r_flk( make_flock( F_RDLCK, SEEK_SET, 0, 0, 0 )),
69 bu_w_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
70 bu_r_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
71 fu_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
72 fu_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
82 gethostname(hostname,32);
85 char * fuLockPollIntervalPtr = getenv(
"FFF_LOCKPOLLINTERVAL");
86 if (fuLockPollIntervalPtr) {
91 catch( boost::bad_lexical_cast
const& ) {
97 char * fileBrokerParamPtr = getenv(
"FFF_USEFILEBROKER");
98 if (fileBrokerParamPtr) {
103 catch( boost::bad_lexical_cast
const& ) {
111 if (
stat(
"/etc/appliance/bus.config",&buf)==0) {
123 throw cms::Exception(
"EvFDaqDirector") <<
"No file service or BU data address information";
127 char * startFromLSPtr = getenv(
"FFF_STARTFROMLS");
128 if (startFromLSPtr) {
133 catch( boost::bad_lexical_cast
const& ) {
139 char * fileBrokerUseLockParamPtr = getenv(
"FFF_FILEBROKERUSELOCALLOCK");
140 if (fileBrokerUseLockParamPtr) {
145 catch( boost::bad_lexical_cast
const& ) {
154 std::stringstream ss;
155 ss <<
"run" << std::setfill(
'0') << std::setw(6) <<
run_;
157 ss = std::stringstream();
161 ss = std::stringstream();
166 int retval =
mkdir(
base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
167 if (retval != 0 && errno != EEXIST) {
168 throw cms::Exception(
"DaqDirector") <<
" Error checking for base dir -: " 169 <<
base_dir_ <<
" mkdir error:" << strerror(errno);
175 S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
176 if (retval != 0 && errno != EEXIST) {
177 throw cms::Exception(
"DaqDirector") <<
" Error creating run dir -: " 178 <<
run_dir_ <<
" mkdir error:" << strerror(errno);
186 fulocal_rwlock_fd_ = open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
188 throw cms::Exception(
"DaqDirector") <<
" Error creating/opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
189 chmod(fulocal_lock_.c_str(),0777);
192 fulocal_rwlock_fd2_ = open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
194 throw cms::Exception(
"DaqDirector") <<
" Error opening a local lock file -: " << fulocal_lock_.c_str() <<
" : " << strerror(errno);
207 S_IRWXU | S_IRWXG | S_IRWXO);
208 if (retval != 0 && errno != EEXIST) {
211 <<
" mkdir error:" << strerror(errno) <<
"\n";
215 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
216 if (retval != 0 && errno != EEXIST) {
217 throw cms::Exception(
"DaqDirector") <<
" Error creating bu run open dir -: " 224 O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
226 edm::LogWarning(
"EvFDaqDirector") <<
"problem with creating filedesc for buwritelock -: " 229 edm::LogInfo(
"EvFDaqDirector") <<
"creating filedesc for buwritelock -: " 233 edm::LogWarning(
"EvFDaqDirector")<<
"Error creating write lock stream -: " << strerror(errno);
248 retval =
mkdir(tmphltdir.c_str(),S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
249 if (retval != 0 && errno != EEXIST)
251 <<
" Error creating bu run dir -: " << hltdir
252 <<
" mkdir error:" << strerror(errno) <<
"\n";
254 boost::filesystem::copy_file(
hltSourceDirectory_+
"/HltConfig.py",tmphltdir+
"/HltConfig.py");
256 boost::filesystem::copy_file(
hltSourceDirectory_+
"/fffParameters.jsn",tmphltdir+
"/fffParameters.jsn");
258 boost::filesystem::rename(tmphltdir,hltdir);
270 if (retval != 0 && errno != EEXIST) {
271 throw cms::Exception(
"DaqDirector") <<
" Error checking for bu base dir -: " 272 <<
bu_base_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
283 std::stringstream sstp;
291 if (!
stat(defPath.c_str(), &statbuf))
292 edm::LogInfo(
"EvFDaqDirector") <<
"found JSD file in ramdisk -: " << defPath;
295 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
296 defPath =
std::string(getenv(
"CMSSW_BASE")) +
"/" + defPathSuffix;
297 if (
stat(defPath.c_str(), &statbuf)) {
298 defPath =
std::string(getenv(
"CMSSW_RELEASE_BASE")) +
"/" + defPathSuffix;
299 if (
stat(defPath.c_str(), &statbuf)) {
300 defPath = defPathSuffix;
306 DataPointDefinition::getDataPointDefinitionFor(defPath,
dpd_,&defLabel);
316 boost::system::error_code
ec;
317 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
345 desc.
setComment(
"Service used for file locking arbitration and for propagating information between other EvF components");
348 desc.
addUntracked<
unsigned int> (
"runNumber",0)->setComment(
"Run Number in ramdisk to open");
349 desc.
addUntracked<
bool> (
"useFileBroker",
false)->setComment(
"Use BU file service to grab input data instead of NFS file locking");
352 desc.
addUntracked<
bool> (
"fileBrokerKeepAlive",
true)->setComment(
"Use keep alive to avoid using large number of sockets");
353 desc.
addUntracked<
bool> (
"fileBrokerUseLocalLock",
true)->setComment(
"Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
354 desc.
addUntracked<
bool>(
"outputAdler32Recheck",
false)->setComment(
"Check Adler32 of per-process output files while micro-merging");
355 desc.
addUntracked<
bool>(
"requireTransfersPSet",
false)->setComment(
"Require complete transferSystem PSet in the process configuration");
356 desc.
addUntracked<
std::string>(
"selectedTransferMode",
"")->setComment(
"Selected transfer mode (choice in Lvl0 propagated as Python parameter");
357 desc.
addUntracked<
unsigned int>(
"fuLockPollInterval",2000)->setComment(
"Lock polling interval in microseconds for the input directory file lock");
358 desc.
addUntracked<
std::string>(
"mergingPset",
"")->setComment(
"Name of merging PSet to look for merging type definitions for streams");
360 descriptions.
add(
"EvFDaqDirector", desc);
376 <<
run_dir_ <<
". This is not the highest run " 395 edm::LogWarning(
"EvFDaqDirector") <<
" Handles to check for files to delete were not set by the input source...";
402 if (it->second->lumi_ == ls) {
404 LogDebug(
"EvFDaqDirector") <<
"Deleting input file -:" << it->second->fileName_;
409 catch (boost::filesystem::filesystem_error
const& ex)
411 edm::LogError(
"EvFDaqDirector") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() <<
". Trying again.";
416 catch (
const boost::filesystem::filesystem_error&) {}
420 edm::LogError(
"EvFDaqDirector") <<
" - deleteFile std::exception CAUGHT -: " << ex.what() <<
". Trying again.";
531 int retval =
remove(filename.c_str());
533 edm::LogError(
"EvFDaqDirector") <<
"Could not remove used file -: " << filename <<
". error = " 545 int lock_attempts = 0;
546 long total_lock_attempts = 0;
552 if (stopFileCheck==0 || stopFilePidCheck==0) {
553 if (stopFileCheck==0)
562 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
568 timeval ts_lockbegin;
569 gettimeofday(&ts_lockbegin,
nullptr);
578 if (lock_attempts>5000000 || errno==116) {
580 edm::LogWarning(
"EvFDaqDirector") <<
"Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
582 edm::LogWarning(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 seconds. Checking if run directory and fu.lock file are present -: errno " 583 << errno <<
":"<< strerror(errno) << std::endl;
587 edm::LogWarning(
"EvFDaqDirector") <<
"Detected local EoLS for lumisection "<<
ls ;
597 if (total_lock_attempts>5*60000000) {
598 edm::LogError(
"EvFDaqDirector") <<
"Unable to obtain a lock for 5 minutes. Stopping polling activity.";
604 gettimeofday(&ts_lockend,
nullptr);
605 long deltat = (ts_lockend.tv_usec-ts_lockbegin.tv_usec) + (ts_lockend.tv_sec-ts_lockbegin.tv_sec)*1000000;
606 if (deltat>0.) lockWaitTime=deltat;
608 if(retval!=0)
return fileStatus;
612 gettimeofday(&ts_lockend,0);
616 int fu_readwritelock_fd2 = open(
fulockfile_.c_str(), O_RDWR, S_IRWXU);
617 if (fu_readwritelock_fd2 == -1)
619 <<
" create. error:" << strerror(errno);
621 FILE * fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2,
"r+");
624 if (fu_rw_lock_stream2 !=
nullptr) {
628 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
632 fscanf(fu_rw_lock_stream2,
"%u %u", &readLs, &readIndex);
635 unsigned int currentLs = readLs;
636 bool bumpedOk =
false;
639 if (ls && ls+1 < currentLs) ls++;
642 bumpedOk =
bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
644 if (ls && readLs>currentLs && currentLs > ls) {
652 if (ls==0 && readLs>currentLs) {
667 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
669 ftruncate(fu_readwritelock_fd2, 0);
671 fprintf(fu_rw_lock_stream2,
"%u %u", readLs, readIndex + 1);
672 fflush(fu_rw_lock_stream2);
673 fsync(fu_readwritelock_fd2);
675 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" << readIndex + 1;
678 throw cms::Exception(
"EvFDaqDirector") <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
681 else if (currentLs < readLs) {
683 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
685 ftruncate(fu_readwritelock_fd2, 0);
687 fprintf(fu_rw_lock_stream2,
"%u %u", readLs, readIndex);
688 fflush(fu_rw_lock_stream2);
689 fsync(fu_readwritelock_fd2);
690 LogDebug(
"EvFDaqDirector") <<
"Written to file -: " << readLs <<
":" <<
readIndex;
693 throw cms::Exception(
"EvFDaqDirector") <<
"seek on fu read/write lock for updating failed with error " << strerror(errno);
697 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for reading failed with error " << strerror(errno);
700 edm::LogError(
"EvFDaqDirector") <<
"fu read/write lock stream is invalid " << strerror(errno);
702 fclose(fu_rw_lock_stream2);
705 timeval ts_preunlock;
706 gettimeofday(&ts_preunlock,0);
707 int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
708 double locked_period=locked_period_int+double(ts_preunlock.tv_usec - ts_lockend.tv_usec)/1000000;
717 if (retvalu==-1)
edm::LogError(
"EvFDaqDirector") <<
"Error unlocking the fu.lock " << strerror(errno);
720 edm::LogDebug(
"EvFDaqDirector") <<
"Waited during lock -: " << locked_period <<
" seconds";
723 if ( fileStatus ==
noFile ) {
728 if (stopFileLS>=0 && (
int)ls > stopFileLS) {
729 edm::LogInfo(
"EvFDaqDirector") <<
"Reached maximum lumisection set by hltd";
738 boost::filesystem::ifstream ij(BUEoLSFile);
742 if (!reader.
parse(ij, deserializeRoot)) {
743 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" << BUEoLSFile;
756 while (!def.empty()) {
758 if (def.find(
'/')==0)
763 if (
stat(fullpath.c_str(), &buf) == 0) {
766 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
771 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
773 for (
unsigned int i=0;
i<eolsDpd.
getNames().size();
i++)
780 if (def.size()<=1 || def.find(
'/')==std::string::npos) {
784 def = def.substr(def.find(
'/')+1);
791 edm::LogError(
"EvFDaqDirector") <<
" error reading number of files from BU JSON -: " << BUEoLSFile;
794 return boost::lexical_cast<
int>(
data);
808 if (maxLS>=0 && ls > (
unsigned int)maxLS)
return false;
811 std::stringstream ss;
812 unsigned int nextIndex =
index;
817 if (
stat(nextFile.c_str(), &buf) == 0) {
826 bool eolFound = (
stat(BUEoLSFile.c_str(), &buf) == 0);
830 if (
stat(nextFile.c_str(), &buf) == 0) {
837 if (indexFilesInLS < 0)
842 if ((
int)index<indexFilesInLS) {
844 edm::LogError(
"EvFDaqDirector") <<
"Potential miss of index file in LS -: " << ls <<
". Missing " 845 << nextFile <<
" because " << indexFilesInLS-1 <<
" is the highest index expected. Will not update fu.lock file";
854 if (maxLS>=0 && ls > (
unsigned int)maxLS)
return false;
857 if (
stat(nextFile.c_str(), &buf) == 0) {
868 eolFound = (
stat(BUEoLSFile.c_str(), &buf) == 0);
877 edm::LogError(
"EvFDaqDirector") <<
"Error creating fu read/write lock stream " 880 edm::LogInfo(
"EvFDaqDirector") <<
"Initializing FU LOCK FILE";
889 S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
896 <<
" create:" << create <<
" error:" << strerror(errno);
898 LogDebug(
"EvFDaqDirector") <<
"creating filedesc for fureadwritelock -: " 903 edm::LogError(
"EvFDaqDirector") <<
"problem with opening fuwritelock file stream -: " << strerror(errno);
941 if (checkIfExists==
false ||
stat(fuBoLS.c_str(), &buf) != 0) {
942 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
948 if ( currentLumiSection > 0) {
952 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
954 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
959 else if (doCreateBoLS) {
969 std::ostringstream fileNameWithPID;
971 << std::setfill(
'0') << std::setw(5) <<
pid_ <<
".jsn";
976 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " << jsonDestPath;
980 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY))< 0) {
981 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file -: " << jsonSourcePath <<
" : " << strerror(errno);
983 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY))< 0) {
984 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open input file (on retry) -: " << jsonSourcePath <<
" : " << strerror(errno);
985 if (errno==ENOENT) fileFound=
false;
990 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
991 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
992 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode))< 0)
995 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - destination file already exists -: " << jsonDestPath <<
" : ";
999 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file -: " << jsonDestPath <<
" : " << strerror(errno);
1001 struct stat out_stat;
1002 if (
stat(jsonDestPath.c_str(),&out_stat)==0) {
1003 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1004 if (unlink(jsonDestPath.c_str())==-1) {
1005 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to remove -: " << jsonDestPath <<
" : "<< strerror(errno);
1008 if ((
outfile = ::open(jsonDestPath.c_str(), oflag, omode))< 0) {
1009 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to open output file (on retry) -: " << jsonDestPath <<
" : " << strerror(errno);
1015 const std::size_t buf_sz = 512;
1016 std::size_t tot_written = 0;
1017 std::unique_ptr<char> buf(
new char [buf_sz]);
1019 ssize_t sz, sz_read=1, sz_write;
1020 while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0)
1024 assert(sz_read - sz_write > 0);
1025 if ((sz = ::
write(
outfile, buf.get() + sz_write,sz_read - sz_write)) < 0)
1033 }
while (sz_write < sz_read);
1038 if (tot_written>0) {
1040 if (unlink(jsonSourcePath.c_str()) == -1) {
1041 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to remove -: " << jsonSourcePath <<
" : "<< strerror(errno);
1045 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - failed to copy json file or file was empty -: " << jsonSourcePath;
1053 std::stringstream ss;
1056 if (tot_written<=buf_sz) {
1057 result = reader.
parse(buf.get(), deserializeRoot);
1062 boost::filesystem::ifstream ij(jsonDestPath);
1065 catch (boost::filesystem::filesystem_error
const& ex) {
1066 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1069 result = reader.
parse(ss.str(), deserializeRoot);
1072 if (tot_written<=buf_sz) ss << buf.get();
1073 edm::LogError(
"EvFDaqDirector") <<
"Failed to deserialize JSON file -: " << jsonDestPath
1075 <<
"CONTENT:\n" << ss.str()<<
".";
1095 edm::LogError(
"EvFDaqDirector::grabNextJsonFile") <<
"grabNextJsonFile - " <<
1096 " error reading number of events from BU JSON; No input value. data -: " <<
data;
1102 fileSizeFromJson=-1;
1108 fileSizeFromJson = boost::lexical_cast<
long>(
data);
1110 catch( boost::bad_lexical_cast
const& ) {
1112 edm::LogWarning(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of Bytes from BU JSON. " 1113 <<
"Input value is -: " <<
data;
1119 return boost::lexical_cast<
int>(
data);
1121 catch( boost::bad_lexical_cast
const&
e) {
1122 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - error parsing number of events from BU JSON. " 1123 <<
"Input value is -: " <<
data;
1125 catch (std::runtime_error
const& e)
1128 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - std::runtime_error exception -: " << e.what();
1133 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1138 edm::LogError(
"EvFDaqDirector") <<
"grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1152 std::ostringstream fileNameWithPID;
1153 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" 1154 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
1155 jsonDestPath /= fileNameWithPID.str();
1157 LogDebug(
"EvFDaqDirector") <<
"JSON rename -: " << jsonSourcePath <<
" to " 1162 catch (boost::filesystem::filesystem_error
const& ex)
1165 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1176 catch (boost::filesystem::filesystem_error
const& ex)
1179 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1184 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
1187 boost::filesystem::ifstream ij(jsonDestPath);
1191 std::stringstream ss;
1193 if (!reader.
parse(ss.str(), deserializeRoot)) {
1195 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1197 <<
"CONTENT:\n" << ss.str()<<
".";
1198 throw std::runtime_error(
"Cannot deserialize input JSON file");
1217 throw cms::Exception(
"EvFDaqDirector::grabNextJsonFileUnlock") <<
1218 " error reading number of events from BU JSON -: No input value " <<
data;
1220 return boost::lexical_cast<
int>(
data);
1222 catch (boost::filesystem::filesystem_error
const& ex)
1226 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1228 catch (std::runtime_error
const&
e)
1232 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile runtime Exception -: " << e.what();
1234 catch( boost::bad_lexical_cast
const& ) {
1235 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile error parsing number of events from BU JSON. " 1236 <<
"Input value is -: " <<
data;
1242 edm::LogError(
"EvFDaqDirector") <<
"grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1249 uint32_t& serverLS, uint32_t& closedServerLS,
1253 serverError =
false;
1255 boost::system::error_code
ec;
1270 boost::asio::streambuf request;
1271 std::ostream request_stream(&request);
1274 std::stringstream spath;
1275 spath << path <<
"&stopls=" << maxLS;
1279 request_stream <<
"GET " << path <<
" HTTP/1.1\r\n";
1281 request_stream <<
"Accept: */*\r\n";
1282 request_stream <<
"Connection: keep-alive\r\n\r\n";
1287 edm::LogInfo(
"EvFDaqDirector") <<
"reconnecting socket on received connection_reset";
1302 boost::asio::streambuf response;
1303 boost::asio::read_until(*
socket_, response,
"\r\n",ec);
1310 std::istream response_stream(&response);
1313 response_stream >> http_version;
1315 response_stream >> serverHttpStatus;
1318 std::getline(response_stream, status_message);
1319 if (!response_stream || http_version.substr(0, 5) !=
"HTTP/")
1325 if (serverHttpStatus != 200)
1327 edm::LogWarning(
"EvFDaqDirector") <<
"Response returned with status code " << serverHttpStatus;
1334 while (std::getline(response_stream, header) && header !=
"\r") {
1338 std::map<std::string,std::string> serverMap;
1339 while (std::getline(response_stream, fileInfo) && fileInfo !=
"\r") {
1340 auto pos=fileInfo.find(
"=");
1341 if (
pos==std::string::npos)
continue;
1342 auto stitle = fileInfo.substr(0,
pos);
1343 auto svalue = fileInfo.substr(
pos+1);
1344 serverMap[stitle]=svalue;
1348 auto server_version = serverMap.find(
"version");
1349 assert(server_version!=serverMap.end());
1351 auto server_run = serverMap.find(
"runnumber");
1352 assert(server_run!=serverMap.end());
1355 auto server_state = serverMap.find(
"state");
1356 assert(server_state!=serverMap.end());
1358 auto server_eols = serverMap.find(
"lasteols");
1359 assert(server_eols!=serverMap.end());
1361 auto server_ls = serverMap.find(
"lumisection");
1365 if (server_ls!=serverMap.end())
1368 serverLS = closedServerLS+1;
1371 if (s_state ==
"STARTING")
1373 auto server_file = serverMap.find(
"file");
1374 assert(server_file==serverMap.end());
1376 edm::LogInfo(
"EvFDaqDirector") <<
"Got STARTING notification with last EOLS " << closedServerLS;
1378 else if (s_state==
"READY")
1380 auto server_file = serverMap.find(
"file");
1381 if (server_file==serverMap.end()) {
1383 if (serverLS<=closedServerLS) serverLS=closedServerLS+1;
1385 edm::LogInfo(
"EvFDaqDirector") <<
"Got READY notification with last EOLS " << closedServerLS <<
" and no new file";
1390 auto server_fileprefix = serverMap.find(
"fileprefix");
1392 if (server_fileprefix!=serverMap.end()) {
1393 auto pssize = server_fileprefix->second.size();
1394 if (pssize>1 && server_fileprefix->second[0]==
'"' && server_fileprefix->second[pssize-1]==
'"')
1395 fileprefix = server_fileprefix->second.substr(1,pssize-2);
1397 fileprefix = server_fileprefix->second;
1401 auto ssize = server_file->second.size();
1402 if (ssize>1 && server_file->second[0]==
'"' && server_file->second[ssize-1]==
'"')
1403 filestem = server_file->second.substr(1,ssize-2);
1405 filestem = server_file->second;
1406 assert(!filestem.empty());
1408 filestem =
bu_run_dir_ +
"/" + fileprefix + filestem;
1409 nextFileJson = filestem +
".jsn";
1411 edm::LogInfo(
"EvFDaqDirector") <<
"Got READY notification with last EOLS " << closedServerLS <<
" new LS " << serverLS <<
" file:" << filestem;
1414 else if (s_state==
"EOLS") {
1415 serverLS=closedServerLS+1;
1416 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOLS notification with last EOLS " << closedServerLS;
1419 else if (s_state==
"EOR") {
1421 edm::LogInfo(
"EvFDaqDirector") <<
"Got EOR notification with last EOLS " << closedServerLS;
1424 else if (s_state==
"NORUN") {
1425 auto err_msg = serverMap.find(
"errormessage");
1426 if (err_msg!=serverMap.end())
1427 edm::LogWarning(
"EvFDaqDirector") <<
"Server NORUN -:" << server_state->second <<
" : " << err_msg->second;
1433 else if (s_state==
"ERROR") {
1434 auto err_msg = serverMap.find(
"errormessage");
1435 if (err_msg!=serverMap.end())
1436 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second <<
" : " << err_msg->second;
1438 edm::LogWarning(
"EvFDaqDirector") <<
"Server error -:" << server_state->second;
1443 edm::LogWarning(
"EvFDaqDirector") <<
"Unknown Server state -:" << server_state->second;
1450 while (boost::asio::read(*
socket_, response,
1451 boost::asio::transfer_at_least(1), ec)) {
1453 if (ec != boost::asio::error::eof) {
1468 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1491 int& serverEventsInNewFile, int64_t& fileSizeFromJson,
uint64_t& thisLockWaitTimeUs) {
1500 int stopFileLS = -1;
1503 if (stopFileCheck==0 || stopFilePidCheck==0) {
1504 if (stopFileCheck==0)
1513 edm::LogWarning(
"EvFDaqDirector") <<
"Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
1528 timeval ts_lockbegin;
1529 gettimeofday(&ts_lockbegin,
nullptr);
1532 uint32_t serverLS, closedServerLS;
1533 unsigned int serverHttpStatus;
1540 int maxLS = stopFileLS < 0 ? -1:
std::max(stopFileLS,(
int)currentLumiSection);
1541 fileStatus =
contactFileBroker(serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, maxLS);
1551 if (currentLumiSection==0) {
1560 if (closedServerLS>=currentLumiSection) {
1561 for (uint32_t
i=
std::max(currentLumiSection,1
U);
i<=closedServerLS;
i++)
1566 bool fileFound=
true;
1569 serverEventsInNewFile =
grabNextJsonFile(nextFileJson,nextFileRaw,fileSizeFromJson,fileFound);
1587 ls =
std::max(currentLumiSection,serverLS);
1588 else if (fileStatus==
newFile) {
1589 assert(serverLS>=ls);
1592 else if (fileStatus==
noFile) {
1596 edm::LogWarning(
"EvFDaqDirector") <<
"Server reported LS " << serverLS <<
" which is smaller than currently open LS " << ls <<
". Ignoring response";
1608 if (!boost::filesystem::is_directory(openPath)) {
1609 LogDebug(
"EvFDaqDirector") <<
"<open> FU dir not found. Creating... -:" << openPath.string();
1610 boost::filesystem::create_directories(openPath);
1617 boost::filesystem::ifstream ij(file);
1621 if (!reader.
parse(ij, deserializeRoot)) {
1622 edm::LogError(
"EvFDaqDirector") <<
"Cannot deserialize input JSON file -:" <<
file;
1626 int ret = deserializeRoot.
get(
"lastLS",
"").
asInt();
1637 std::stringstream ss;
1638 ss << fileprefix << std::setfill(
'0') << std::setw(4) << lscount <<
"_EoLS.jsn";
1639 fullpath = ss.str();
1642 while(
stat(fullpath.c_str(),&buf)==0);
1658 std::vector<std::string>
destinations = tsPset.getParameter<std::vector<std::string>>(
"destinations");
1659 for (
auto &
dest: destinations) destinationsVal.append(
dest);
1660 (*transferSystemJson_)[
"destinations"]=destinationsVal;
1663 std::vector<std::string> modes = tsPset.getParameter< std::vector<std::string> >(
"transferModes");
1665 (*transferSystemJson_)[
"transferModes"]=modesVal;
1667 for (
auto psKeyItr =tsPset.psetTable().begin();psKeyItr!=tsPset.psetTable().end(); ++ psKeyItr) {
1668 if (psKeyItr->first!=
"destinations" && psKeyItr->first!=
"transferModes") {
1671 for (
auto &
mode : modes) {
1673 if (!streamDef.
existsAs<std::vector<std::string>>(
mode,
true))
1674 throw cms::Exception(
"EvFDaqDirector") <<
" Missing transfer system specification for -:" << psKeyItr->first <<
" (transferMode " <<
mode <<
")";
1675 std::vector<std::string> streamDestinations = streamDef.
getParameter<std::vector<std::string>>(
mode);
1679 if (streamDestinations.empty())
1680 throw cms::Exception(
"EvFDaqDirector") <<
" Missing transter system destination(s) for -: "<< psKeyItr->first <<
", mode:" <<
mode;
1682 for (
auto & sdest:streamDestinations) {
1683 bool sDestValid=
false;
1684 sDestsValue.append(sdest);
1685 for (
auto &
dest: destinations) {
1686 if (
dest==sdest) sDestValid=
true;
1689 throw cms::Exception(
"EvFDaqDirector") <<
" Invalid transter system destination specified for -: "<< psKeyItr->first <<
", mode:" <<
mode <<
", dest:"<<sdest;
1691 streamVal[
mode]=sDestsValue;
1693 (*transferSystemJson_)[psKeyItr->first] = streamVal;
1699 throw cms::Exception(
"EvFDaqDirector") <<
"transferSystem PSet not found";
1707 streamRequestName = stream;
1709 std::stringstream
msg;
1710 msg <<
"Transfer system mode definitions missing for -: " << stream;
1714 edm::LogWarning(
"EvFDaqDirector") << msg.str() <<
" (permissive mode)";
1720 edm::LogWarning(
"EvFDaqDirector") <<
"Selected mode string is not provided as DaqDirector parameter." 1721 <<
"Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1725 throw cms::Exception(
"EvFDaqDirector") <<
"Selected mode string is not provided as DaqDirector parameter.";
1730 std::stringstream
msg;
1731 msg <<
"Selected transfer mode " <<
selectedTransferMode_ <<
" is not specified for stream " << streamRequestName;
1735 edm::LogWarning(
"EvFDaqDirector") << msg.str() <<
" (permissive mode)";
1744 if (!ret.empty()) ret +=
",";
1745 ret+=(*it).asString();
1760 tbb::concurrent_hash_map<std::string,std::string>::accessor ac;
1762 ac->second = streamType;
1770 tbb::concurrent_hash_map<std::string,std::string>::const_accessor search_ac;
1772 return search_ac->second;
1774 edm::LogInfo(
"EvFDaqDirector") <<
" No merging type specified for stream " << stream <<
". Using default value";
1776 tbb::concurrent_hash_map<std::string,std::string>::accessor ac;
1778 ac->second = defaultName;
1785 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1786 close(proc_flag_fd);
unsigned int maxNumberOfThreads() const
T getParameter(std::string const &) const
std::string getStreamDestinations(std::string const &stream) const
Value get(UInt index, const Value &defaultValue) const
std::vector< std::string > & getData()
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPreallocate(Preallocate::slot_type const &iSlot)
const_iterator begin() const
bool existsAs(std::string const ¶meterName, bool trackiness=true) const
checks if a parameter exists as a given type
boost::asio::io_service io_service_
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
void setAllowAnything()
allow any parameter label/value pairs
jsoncollector::DataPointDefinition * dpd_
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
ParameterSet const & getParameterSet(ParameterSetID const &id)
pthread_mutex_t init_lock_
LuminosityBlockID const & luminosityBlockID() const
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Value & append(const Value &value)
Append value to array at the end.
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, int maxLS)
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
ParameterSetID const & parameterSetID() const
void createRunOpendirMaybe()
unsigned int maxNumberOfStreams() const
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void setComment(std::string const &value)
std::mutex * fileDeleteLockPtr_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string mergeTypePset_
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
void tryInitializeFuLockFile()
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void openFULockfileStream(bool create)
int getNFilesFromEoLS(std::string BUEoLSFile)
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int getLumisectionToStart() const
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
void checkMergeTypePSet(edm::ProcessContext const &pc)
ParameterSet const & getParameterSet(std::string const &) const
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void deserialize(Json::Value &root) override
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
LuminosityBlockNumber_t luminosityBlock() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::string & getDefinition()
def remove(d, key, TELL=False)
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string findHighestRunDir()
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
bool fileBrokerUseLocalLock_
void postEndRun(edm::GlobalContext const &globalContext)
Unserialize a JSON document into a Value.
static const std::vector< std::string > MergeTypeNames_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
const_iterator end() const
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Iterator for object and array value.
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
bool fileBrokerKeepAlive_
std::string getRunOpenDirPath() const
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
std::string getFFFParamsFilePathOnBU() const
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string getEoRFilePathOnFU() const
std::string bu_run_open_dir_
array value (ordered list)