17 #include <sys/types.h>
25 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
29 #define SIZE_MAX ((size_t)-1)
32 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
35 ssize_t getline(
char** lineptr,
size_t*
n, FILE*
fp) {
44 if (*lineptr ==
NULL || *
n == 0) {
46 *lineptr = (
char*)malloc(*
n);
47 if (*lineptr ==
NULL) {
63 if (cur_len + 1 >= *
n) {
65 size_t needed = 2 * *
n + 1;
68 if (needed_max < needed)
70 if (cur_len + 1 >= needed) {
75 new_lineptr = (
char*)realloc(*lineptr, needed);
76 if (new_lineptr ==
NULL) {
81 *lineptr = new_lineptr;
85 (*lineptr)[cur_len] =
i;
91 (*lineptr)[cur_len] =
'\0';
102 gettimeofday(&
t,
nullptr);
105 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
109 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
117 inprocessDir_(
pset.getParameter<
std::
string>(
"inprocessDir")),
118 processedDir_(
pset.getParameter<
std::
string>(
"processedDir")),
119 corruptedDir_(
pset.getParameter<
std::
string>(
"corruptedDir")),
120 tokenFile_(
pset.getUntrackedParameter<
std::
string>(
"tokenFile",
"watcherSourceToken")),
121 timeOut_(
pset.getParameter<
int>(
"timeOutInSec")),
123 verbosity_(
pset.getUntrackedParameter<
int>(
"verbosity", 0)) {
130 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
138 for (
unsigned i = 0;
i <
dirs.size(); ++
i) {
140 struct stat fileStat;
141 if (0 ==
stat(
dir.c_str(), &fileStat)) {
142 if (!S_ISDIR(fileStat.st_mode)) {
143 throw cms::Exception(
"[WatcherSource]") <<
"File " <<
dir <<
" exists but is not a directory "
147 if (0 !=
mkdir(
dir.c_str(), 0755)) {
148 throw cms::Exception(
"[WatcherSource]") <<
"Failed to create directory " <<
dir <<
" for writing data.";
153 std::stringstream fileListCmdBuf;
154 fileListCmdBuf.str(
"");
160 fileListCmdBuf <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
163 throw cms::Exception(
"WacherSource",
"filePatterns parameter is empty");
164 char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
165 if (getcwd(curDir,
sizeof(curDir)) ==
nullptr) {
166 throw cms::Exception(
"WatcherSource") <<
"Failed to retreived working directory path: " << strerror(errno);
172 fileListCmdBuf <<
"|";
178 fileListCmdBuf <<
")' | sort";
182 cout <<
"[WatcherSource " <<
now() <<
"]"
183 <<
" Command to retrieve input files: " <<
fileListCmd_ <<
"\n";
205 <<
"received wrong message type: expected INIT, got " <<
header->code() <<
"\n";
227 char* lineptr =
nullptr;
236 bool waiting =
false;
237 static bool firstWait =
true;
251 throw cms::Exception(
"WatcherSource") <<
"Failed to retrieve list of input file: " << strerror(errno);
256 if ((len = getline(&lineptr, &
n,
s)) > 0) {
258 lineptr[len - 1] = 0;
260 if (lineptr[0] !=
'/') {
273 cout <<
"[WatcherSource " <<
now() <<
"]"
274 <<
" File to process: '" <<
fileName <<
"'\n";
282 cout <<
"[WatcherSource " <<
now() <<
"]"
283 <<
" No file found. Waiting for new file...\n";
286 gettimeofday(&waitStart,
nullptr);
287 }
else if (!firstWait) {
289 gettimeofday(&
t,
nullptr);
290 float dt = (
t.tv_sec - waitStart.tv_sec) * 1. + (
t.tv_usec - waitStart.tv_usec) * 1.e-6;
292 cout <<
"[WatcherSource " <<
now() <<
"]"
293 <<
" Having waited for new file for " << (
int)
dt <<
" sec. "
294 <<
"Timeout exceeded. Exits.\n";
315 time_t
t =
time(
nullptr);
319 cout <<
"file size: " <<
buf.st_size <<
", prev size: " <<
size <<
"\n";
324 if (difftime(
t,
buf.st_mtime) > 60)
329 if (
fd != 0 &&
buf.st_size == 0) {
334 cout <<
"[WatcherSource " <<
now() <<
"]"
335 <<
" Excuting " <<
c.str() <<
"\n";
336 int i = system(
c.str().c_str());
339 cout <<
"[WatcherSource " <<
now() <<
"] "
340 <<
"Failed to move empty file '" <<
fileName_ <<
"'"
350 buf1[buf1.size() - 1] = 0;
354 buf2[buf1.size() - 1] = 0;
356 string dirnam(
dirname(&buf1[0]));
357 string filenam(basename(&buf2[0]));
362 cout <<
"[WatcherSource " <<
now() <<
"]"
371 <<
"Failed to move file '" <<
fileName_ <<
"' "
372 <<
"to processing directory " <<
inprocessDir_ <<
": " << strerror(errno);
377 cout <<
"[WatcherSource " <<
now() <<
"]"
382 ofstream
f(
".watcherfile");
385 cout <<
"[WatcherSource " <<
now() <<
"]"
386 <<
" Failed to open file " <<
fileName_ << endl;
402 cout <<
"[WatcherSource " <<
now() <<
"]"
403 <<
" Excuting " <<
cmd.str() <<
"\n";
404 int i = system(
cmd.str().c_str());