17 #include <sys/types.h> 25 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L) 29 #define SIZE_MAX ((size_t)-1) 32 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2)) 35 ssize_t getline(
char** lineptr,
size_t*
n, FILE*
fp) {
44 if (*lineptr ==
NULL || *
n == 0) {
47 if (*lineptr ==
NULL) {
63 if (cur_len + 1 >= *
n) {
65 size_t needed = 2 * *
n + 1;
68 if (needed_max < needed)
70 if (cur_len + 1 >= needed) {
75 new_lineptr = (
char*)
realloc(*lineptr, needed);
76 if (new_lineptr ==
NULL) {
81 *lineptr = new_lineptr;
85 (*lineptr)[cur_len] =
i;
91 (*lineptr)[cur_len] =
'\0';
102 gettimeofday(&
t,
nullptr);
105 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
109 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
117 inprocessDir_(
pset.getParameter<
std::
string>(
"inprocessDir")),
118 processedDir_(
pset.getParameter<
std::
string>(
"processedDir")),
119 corruptedDir_(
pset.getParameter<
std::
string>(
"corruptedDir")),
120 tokenFile_(
pset.getUntrackedParameter<
std::
string>(
"tokenFile",
"watcherSourceToken")),
121 timeOut_(
pset.getParameter<
int>(
"timeOutInSec")),
123 verbosity_(
pset.getUntrackedParameter<
int>(
"verbosity", 0)) {
130 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
138 for (
unsigned i = 0;
i <
dirs.size(); ++
i) {
140 struct stat fileStat;
141 if (0 ==
stat(
dir.c_str(), &fileStat)) {
142 if (!S_ISDIR(fileStat.st_mode)) {
143 throw cms::Exception(
"[WatcherSource]") <<
"File " <<
dir <<
" exists but is not a directory " 147 if (0 !=
mkdir(
dir.c_str(), 0755)) {
148 throw cms::Exception(
"[WatcherSource]") <<
"Failed to create directory " <<
dir <<
" for writing data.";
173 <<
"received wrong message type: expected INIT, got " <<
header->code() <<
"\n";
195 char* lineptr =
nullptr;
197 static stringstream
cmd;
198 static bool cmdSet =
false;
199 static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
208 cmd <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
212 if (getcwd(curDir,
sizeof(curDir)) ==
nullptr) {
213 throw cms::Exception(
"WatcherSource") <<
"Failed to retreived working directory path: " << strerror(errno);
226 cout <<
"[WatcherSource " <<
now() <<
"]" 227 <<
" Command to retrieve input files: " <<
cmd.str() <<
"\n";
237 bool waiting =
false;
238 static bool firstWait =
true;
250 FILE*
s = popen(
cmd.str().c_str(),
"r");
252 throw cms::Exception(
"WatcherSource") <<
"Failed to retrieve list of input file: " << strerror(errno);
257 if ((len = getline(&lineptr, &
n,
s)) > 0) {
259 lineptr[len - 1] = 0;
261 if (lineptr[0] !=
'/') {
274 cout <<
"[WatcherSource " <<
now() <<
"]" 275 <<
" File to process: '" <<
fileName <<
"'\n";
283 cout <<
"[WatcherSource " <<
now() <<
"]" 284 <<
" No file found. Waiting for new file...\n";
287 gettimeofday(&waitStart,
nullptr);
288 }
else if (!firstWait) {
290 gettimeofday(&
t,
nullptr);
291 float dt = (
t.tv_sec - waitStart.tv_sec) * 1. + (
t.tv_usec - waitStart.tv_usec) * 1.e-6;
293 cout <<
"[WatcherSource " <<
now() <<
"]" 294 <<
" Having waited for new file for " << (
int)
dt <<
" sec. " 295 <<
"Timeout exceeded. Exits.\n";
316 time_t
t =
time(
nullptr);
320 cout <<
"file size: " <<
buf.st_size <<
", prev size: " <<
size <<
"\n";
325 if (difftime(
t,
buf.st_mtime) > 60)
330 if (
fd != 0 &&
buf.st_size == 0) {
335 cout <<
"[WatcherSource " <<
now() <<
"]" 336 <<
" Executing " <<
c.str() <<
"\n";
337 int i = system(
c.str().c_str());
340 cout <<
"[WatcherSource " <<
now() <<
"] " 341 <<
"Failed to move empty file '" <<
fileName_ <<
"'" 351 buf1[buf1.size() - 1] = 0;
355 buf2[buf2.size() - 1] = 0;
357 string dirnam(
dirname(&buf1[0]));
358 string filenam(basename(&buf2[0]));
363 cout <<
"[WatcherSource " <<
now() <<
"]" 374 << strerror(errno) <<
" (Error no " << errno <<
")";
379 cout <<
"[WatcherSource " <<
now() <<
"]" 384 ofstream
f(
".watcherfile");
387 cout <<
"[WatcherSource " <<
now() <<
"]" 388 <<
" Failed to open file " <<
fileName_ << endl;
412 cout <<
"[WatcherSource " <<
now() <<
"]" 417 <<
"Failed to move processed file '" <<
fileName_ <<
"'" 418 <<
" to processed directory '" <<
processedDir_ <<
": " << strerror(errno) <<
" (Error no " << errno <<
")";
424 cout <<
"Done at " <<
now() <<
"\n";
std::vector< std::string > filePatterns_
const EventMsgView * getNextEvent()
std::deque< std::string > filesInQueue_
std::string corruptedDir_
std::string inprocessDir_
WatcherStreamFileReader(edm::ParameterSet const &pset)
void free(void *ptr) noexcept
const InitMsgView * getHeader()
std::string processedDir_
edm::StreamerInputFile * getInputFile()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
void * malloc(size_t size) noexcept
void * realloc(void *ptr, size_t size) noexcept
~WatcherStreamFileReader()