12 #include <sys/types.h>
24 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
28 #define SIZE_MAX ((size_t)-1)
31 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
34 ssize_t getline(
char** lineptr,
size_t*
n, FILE*
fp) {
43 if (*lineptr ==
NULL || *
n == 0) {
45 *lineptr = (
char*)malloc(*
n);
46 if (*lineptr ==
NULL) {
62 if (cur_len + 1 >= *
n) {
64 size_t needed = 2 * *
n + 1;
67 if (needed_max < needed)
69 if (cur_len + 1 >= needed) {
74 new_lineptr = (
char*)realloc(*lineptr, needed);
75 if (new_lineptr ==
NULL) {
80 *lineptr = new_lineptr;
84 (*lineptr)[cur_len] =
i;
90 (*lineptr)[cur_len] =
'\0';
101 gettimeofday(&
t,
nullptr);
104 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
108 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
115 filePatterns_(
pset.getParameter<
std::vector<
std::
string> >(
"filePatterns")),
116 inprocessDir_(
pset.getParameter<
std::
string>(
"inprocessDir")),
117 processedDir_(
pset.getParameter<
std::
string>(
"processedDir")),
118 corruptedDir_(
pset.getParameter<
std::
string>(
"corruptedDir")),
119 tokenFile_(
pset.getUntrackedParameter<
std::
string>(
"tokenFile",
"watcherSourceToken")),
120 timeOut_(
pset.getParameter<
int>(
"timeOutInSec")),
122 verbosity_(
pset.getUntrackedParameter<
int>(
"verbosity", 0)) {
129 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
137 for (
unsigned i = 0;
i <
dirs.size(); ++
i) {
139 struct stat fileStat;
140 if (0 ==
stat(
dir.c_str(), &fileStat)) {
141 if (!S_ISDIR(fileStat.st_mode)) {
142 throw cms::Exception(
"[WatcherSource]") <<
"File " <<
dir <<
" exists but is not a directory "
146 if (0 !=
mkdir(
dir.c_str(), 0755)) {
147 throw cms::Exception(
"[WatcherSource]") <<
"Failed to create directory " <<
dir <<
" for writing data.";
152 std::stringstream fileListCmdBuf;
153 fileListCmdBuf.str(
"");
159 fileListCmdBuf <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
162 throw cms::Exception(
"WacherSource",
"filePatterns parameter is empty");
163 char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
164 if (getcwd(curDir,
sizeof(curDir)) ==
nullptr) {
165 throw cms::Exception(
"WatcherSource") <<
"Failed to retreived working directory path: " << strerror(errno);
171 fileListCmdBuf <<
"|";
177 fileListCmdBuf <<
")' | sort";
181 cout <<
"[WatcherSource " <<
now() <<
"]"
182 <<
" Command to retrieve input files: " <<
fileListCmd_ <<
"\n";
204 <<
"received wrong message type: expected INIT, got " <<
header->code() <<
"\n";
226 char* lineptr =
nullptr;
235 bool waiting =
false;
236 static bool firstWait =
true;
250 throw cms::Exception(
"WatcherSource") <<
"Failed to retrieve list of input file: " << strerror(errno);
255 if ((len = getline(&lineptr, &
n,
s)) > 0) {
257 lineptr[len - 1] = 0;
259 if (lineptr[0] !=
'/') {
272 cout <<
"[WatcherSource " <<
now() <<
"]"
273 <<
" File to process: '" <<
fileName <<
"'\n";
281 cout <<
"[WatcherSource " <<
now() <<
"]"
282 <<
" No file found. Waiting for new file...\n";
285 gettimeofday(&waitStart,
nullptr);
286 }
else if (!firstWait) {
288 gettimeofday(&
t,
nullptr);
289 float dt = (
t.tv_sec - waitStart.tv_sec) * 1. + (
t.tv_usec - waitStart.tv_usec) * 1.e-6;
291 cout <<
"[WatcherSource " <<
now() <<
"]"
292 <<
" Having waited for new file for " << (
int)
dt <<
" sec. "
293 <<
"Timeout exceeded. Exits.\n";
314 time_t
t =
time(
nullptr);
318 cout <<
"file size: " <<
buf.st_size <<
", prev size: " <<
size <<
"\n";
323 if (difftime(
t,
buf.st_mtime) > 60)
328 if (
fd != 0 &&
buf.st_size == 0) {
333 cout <<
"[WatcherSource " <<
now() <<
"]"
334 <<
" Excuting " <<
c.str() <<
"\n";
335 int i = system(
c.str().c_str());
338 cout <<
"[WatcherSource " <<
now() <<
"] "
339 <<
"Failed to move empty file '" <<
fileName_ <<
"'"
349 buf1[buf1.size() - 1] = 0;
353 buf2[buf1.size() - 1] = 0;
355 string dirnam(
dirname(&buf1[0]));
356 string filenam(basename(&buf2[0]));
361 cout <<
"[WatcherSource " <<
now() <<
"]"
370 <<
"Failed to move file '" <<
fileName_ <<
"' "
371 <<
"to processing directory " <<
inprocessDir_ <<
": " << strerror(errno);
376 cout <<
"[WatcherSource " <<
now() <<
"]"
381 ofstream
f(
".watcherfile");
384 cout <<
"[WatcherSource " <<
now() <<
"]"
385 <<
" Failed to open file " <<
fileName_ << endl;
401 cout <<
"[WatcherSource " <<
now() <<
"]"
402 <<
" Excuting " <<
cmd.str() <<
"\n";
403 int i = system(
cmd.str().c_str());