12 using std::stringstream;
17 struct flock make_flock(short
type, short whence, off_t
start, off_t len, pid_t
pid)
29 testModeNoBuilderUnit_(
30 pset.getUntrackedParameter<bool> (
"testModeNoBuilderUnit",
34 pset.getUntrackedParameter<std::
string> (
"baseDir",
"/data")
37 pset.getUntrackedParameter<std::
string> (
"buBaseDir",
"/data")
40 pset.getUntrackedParameter<std::
string> (
"smBaseDir",
"/sm")
43 pset.getUntrackedParameter<std::
string> (
"monBaseDir",
47 pset.getUntrackedParameter<bool> (
"directorIsBu",
false)
49 run_(pset.getUntrackedParameter<unsigned int> (
"runNumber",0)),
53 fu_readwritelock_fd_(-1),
54 data_readwrite_fd_(-1),
59 bu_w_monitor_stream(0),
60 bu_t_monitor_stream(0),
63 dirManager_(base_dir_),
69 bu_w_flk( make_flock( F_WRLCK, SEEK_SET, 0, 0, 0 )),
70 bu_r_flk( make_flock( F_RDLCK, SEEK_SET, 0, 0, 0 )),
71 bu_w_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
72 bu_r_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
73 fu_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
74 fu_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() )),
75 data_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
76 data_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
82 ss <<
"run" << std::setfill(
'0') << std::setw(6) <<
run_;
88 gethostname(hostname,32);
91 int retval = mkdir(
base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
92 if (retval != 0 && errno != EEXIST) {
93 throw cms::Exception(
"DaqDirector") <<
" Error checking for base dir "
94 <<
base_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
109 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
110 if (retval != 0 && errno != EEXIST) {
113 <<
" mkdir error:" << strerror(errno) <<
"\n";
117 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
118 if (retval != 0 && errno != EEXIST) {
119 throw cms::Exception(
"DaqDirector") <<
" Error creating bu run open dir "
129 monitor_base_dir_ = ost.str() +
"_OLD";
130 retval = mkdir(monitor_base_dir_.c_str(),
131 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
132 if (retval != 0 && errno != EEXIST) {
134 <<
" Error creating monitor dir " << monitor_base_dir_
135 <<
" mkdir error:" << strerror(errno) <<
"\n";
140 O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
142 edm::LogWarning(
"EvFDaqDirector") <<
"problem with creating filedesc for buwritelock "
145 edm::LogInfo(
"EvFDaqDirector") <<
"creating filedesc for buwritelock "
149 edm::LogWarning(
"EvFDaqDirector")<<
"Error creating write lock stream " << strerror(errno);
152 filename = monitor_base_dir_ +
"/diskmonitor.txt";
155 edm::LogWarning(
"EvFDaqDirector") <<
"Error creating bu write lock stream " << strerror(errno);
169 retval = mkdir(
bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
170 if (retval != 0 && errno != EEXIST) {
171 throw cms::Exception(
"DaqDirector") <<
" Error checking for bu base dir "
172 <<
base_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
198 int retval = mkdir(
run_dir_.c_str(),
199 S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
200 if (retval != 0 && errno != EEXIST) {
202 <<
run_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
207 edm::LogWarning(
"EvFDaqDirector") <<
"DaqDirector Warning checking run dir "
208 <<
run_dir_ <<
" this is not the highest run "
218 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
219 if (retval != 0 && errno != EEXIST) {
220 throw cms::Exception(
"DaqDirector") <<
" Error creating output dir "
221 <<
sm_base_dir_ <<
" mkdir error:" << strerror(errno) <<
"\n";
225 std::string mergedDataDir = mergedRunDir +
"/data";
227 retval = mkdir(mergedRunDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
228 if (retval != 0 && errno != EEXIST) {
230 <<
" Error creating merged Run dir " << mergedDataDir
231 <<
" mkdir error:" << strerror(errno) <<
"\n";
235 = mkdir(mergedDataDir.c_str(),
236 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
237 if (retval != 0 && errno != EEXIST) {
239 <<
" Error creating merged data dir " << mergedDataDir
240 <<
" mkdir error:" << strerror(errno) <<
"\n";
243 retval = mkdir(mergedMonDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
244 if (retval != 0 && errno != EEXIST) {
246 <<
" Error creating merged mon dir " << mergedMonDir
247 <<
" mkdir error:" << strerror(errno) <<
"\n";
294 int retval =
remove(filename.c_str());
296 std::cout <<
"Could not remove used file " << filename <<
" error "
297 << strerror(errno) <<
"\n";
313 <<
"seek on bu write lock for updating failed with error "
316 edm::LogError(
"EvFDaqDirector") <<
"bu write lock stream is invalid " << strerror(errno);
328 edm::LogError(
"EvFDaqDirector") <<
"problem with creating filedesc for fuwritelock "
331 edm::LogInfo(
"EvFDaqDirector") <<
"created filedesc for fureadwritelock "
334 edm::LogInfo(
"EvFDaqDirector") <<
"Reopened the fw FD & STREAM";
339 if(retval!=0)
return false;
348 unsigned int readLs,
readIndex, jumpLs, jumpIndex;
357 &jumpLs, &jumpIndex);
362 bool bumpedOk =
bumpFile(readLs, readIndex, nextFile);
373 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for updating failed with error "
381 readIndex + 1, readLs + 2, readIndex + 1);
394 edm::LogInfo(
"EvFDaqDirector")<<
"Written to file: " << readLs <<
":"
395 << readIndex + 1 <<
" --> " << readLs + 2
396 <<
":" << readIndex + 1;
398 edm::LogInfo(
"EvFDaqDirector")<<
"Written to file: " << readLs <<
":"
402 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for updating failed with error "
403 << strerror(errno) << std::endl;
406 edm::LogError(
"EvFDaqDirector") <<
"seek on fu read/write lock for reading failed with error "
407 << strerror(errno) << std::endl;
409 edm::LogError(
"EvFDaqDirector") <<
"fu read/write lock stream is invalid " << strerror(errno);
424 unsigned int readval;
426 unsigned int *
p = &readval;
433 edm::LogError(
"EvFDaqDirector") <<
"error reading bu lock file " << strerror(errno);
437 std::cout <<
"readbulock returning " << retval << std::endl;
446 lockfile +=
"/bu.lock";
447 bool retval = (stat(lockfile.c_str(), &buf) == 0);
452 std::cout <<
"stat of lockfile returned " << retval << std::endl;
459 lockfile +=
"/fu.lock";
460 bool retval = (stat(lockfile.c_str(), &buf) == 0);
465 std::cout <<
"stat of lockfile returned " << retval << std::endl;
470 unsigned long long totsize,
long long lsusec) {
476 double(totsize) /
double(events) / 1000000.,
477 double(totsize) /
double(lsusec), lsusec);
481 <<
"seek on bu write monitor for updating failed with error "
482 << strerror(errno) << std::endl;
484 std::cout <<
"bu write monitor stream is invalid " << strerror(errno)
497 <<
"seek on disk write monitor for updating failed with error "
498 << strerror(errno) << std::endl;
500 std::cout <<
"disk write monitor stream is invalid " << strerror(errno)
515 std::stringstream ss;
516 unsigned int nextIndex =
index;
521 bool found = (stat(nextFile.c_str(), &buf) == 0);
531 unsigned int startingLumi =
ls;
536 found = (stat(nextFile.c_str(), &buf) == 0);
551 string cpCmd =
"cp " + sourceEol +
" " + destEol;
552 edm::LogInfo(
"EvFDaqDirector") <<
" testmode: Running copy cmd = " << cpCmd;
553 int rc = system(cpCmd.c_str());
555 edm::LogError(
"EvFDaqDirector") <<
" testmode: COPY EOL FAILED!!!!!: " << cpCmd;
570 return highestRunDirPath.filename().string();
575 edm::LogError(
"EvFDaqDirector") <<
"Error creating fu read/write lock stream "
578 edm::LogInfo(
"EvFDaqDirector") <<
"Initializing FU LOCK FILE";
579 unsigned int readLs = 1,
readIndex = 0, jumpLs = 3, jumpIndex = 0;
591 S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
596 edm::LogError(
"EvFDaqDirector") <<
"problem with creating filedesc for fuwritelock "
599 edm::LogInfo(
"EvFDaqDirector") <<
"creating filedesc for fureadwritelock "
610 edm::LogError(
"EvFDaqDirector") <<
"problem with creating filedesc for datamerge "
613 edm::LogInfo(
"EvFDaqDirector") <<
"creating filedesc for datamerge "
627 std::stringstream ss;
629 <<
"_ls" << std::setfill(
'0') << std::setw(4) << ls
630 <<
"_index" << std::setfill(
'0') << std::setw(6) <<
index;
635 std::stringstream ss;
637 <<
"_ls" << std::setfill(
'0') << std::setw(4) << ls
639 <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid();
644 std::stringstream ss;
646 <<
"_ls" << std::setfill(
'0') << std::setw(4) << ls
653 std::stringstream ss;
656 <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid()
662 std::stringstream ss;
663 ss <<
"EoLS_" << std::setfill(
'0') << std::setw(4) << ls <<
".jsn";
668 std::stringstream ss;
669 ss <<
"EoR_" << std::setfill(
'0') << std::setw(6) <<
run_ <<
".jsn";
FILE * bu_t_monitor_stream
tuple start
Check for commandline option errors.
void postEndRun(edm::Run const &run, edm::EventSetup const &es)
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
bool checkDirEmpty(std::string &)
bool createOutputDirectory()
void openFULockfileStream(std::string &fuLockFilePath, bool create)
std::string getInitFilePath(std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string eolsFileName(const unsigned int ls) const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
std::string findHighestRunDirStem()
unsigned long previousFileSize_
std::string getPathForFU() const
std::string monitor_base_dir_
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry ®)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
bool check(const DataFrame &df, bool capcheck, bool dvercheck)
bool updateFuLock(unsigned int &ls, std::string &nextFile, bool &eorSeen)
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
FILE * maybeCreateAndLockFileHeadForStream(unsigned int ls, std::string &stream)
std::string findHighestRunDir()
void unlockAndCloseMergeStream()
void tryInitializeFuLockFile()
void removeFile(unsigned int ls, unsigned int index)
void preBeginRun(edm::RunID const &id, edm::Timestamp const &ts)
void updateBuLock(unsigned int ls)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
bool testModeNoBuilderUnit_
std::string eorFileName() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile)
std::string findHighestRunDir()
FILE * bu_w_monitor_stream
void writeDiskAndThrottleStat(double, int, int)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void watchPostEndRun(PostEndRun::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag false
struct flock data_rw_fulk
void accummulateFileSize(unsigned long fileSize)
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
void writeLsStatisticsBU(unsigned int, unsigned int, unsigned long long, long long)
std::string bu_run_open_dir_
std::string initFileName(std::string const &stream) const
void watchPreBeginRun(PreBeginRun::slot_type const &iSlot)