17 #include <sys/types.h>
25 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
29 #define SIZE_MAX ((size_t)-1)
32 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
35 ssize_t getline(
char** lineptr,
size_t*
n, FILE*
fp) {
44 if (*lineptr ==
NULL || *n == 0) {
46 *lineptr = (
char*)malloc(*n);
47 if (*lineptr ==
NULL) {
63 if (cur_len + 1 >= *n) {
65 size_t needed = 2 * *n + 1;
68 if (needed_max < needed)
70 if (cur_len + 1 >= needed) {
75 new_lineptr = (
char*)realloc(*lineptr, needed);
76 if (new_lineptr ==
NULL) {
81 *lineptr = new_lineptr;
85 (*lineptr)[cur_len] =
i;
91 (*lineptr)[cur_len] =
'\0';
92 result = cur_len ? (ssize_t)cur_len : result;
102 gettimeofday(&t,
nullptr);
105 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
106 buf[
sizeof(
buf) - 1] = 0;
109 buf2 << buf <<
" " << ((t.tv_usec + 500) / 1000) <<
" ms";
115 : inputDir_(pset.getParameter<std::
string>(
"inputDir")),
116 filePatterns_(pset.getParameter<std::
vector<std::
string> >(
"filePatterns")),
117 inprocessDir_(pset.getParameter<std::
string>(
"inprocessDir")),
118 processedDir_(pset.getParameter<std::
string>(
"processedDir")),
119 corruptedDir_(pset.getParameter<std::
string>(
"corruptedDir")),
120 tokenFile_(pset.getUntrackedParameter<std::
string>(
"tokenFile",
"watcherSourceToken")),
121 timeOut_(pset.getParameter<int>(
"timeOutInSec")),
123 verbosity_(pset.getUntrackedParameter<int>(
"verbosity", 0)) {
130 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
138 for (
unsigned i = 0;
i < dirs.size(); ++
i) {
139 const string&
dir = dirs[
i];
140 struct stat fileStat;
141 if (0 ==
stat(dir.c_str(), &fileStat)) {
142 if (!S_ISDIR(fileStat.st_mode)) {
143 throw cms::Exception(
"[WatcherSource]") <<
"File " << dir <<
" exists but is not a directory "
147 if (0 !=
mkdir(dir.c_str(), 0755)) {
148 throw cms::Exception(
"[WatcherSource]") <<
"Failed to create directory " << dir <<
" for writing data.";
158 return inputFile ? inputFile->
newHeader() :
false;
165 if (inputFile ==
nullptr) {
173 <<
"received wrong message type: expected INIT, got " << header->
code() <<
"\n";
191 return inputFile ==
nullptr ?
nullptr : inputFile->
currentRecord();
195 char* lineptr =
nullptr;
197 static stringstream
cmd;
198 static bool cmdSet =
false;
199 static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
208 cmd <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
212 if (getcwd(curDir,
sizeof(curDir)) ==
nullptr) {
213 throw cms::Exception(
"WatcherSource") <<
"Failed to retreived working directory path: " << strerror(errno);
226 cout <<
"[WatcherSource " <<
now() <<
"]"
227 <<
" Command to retrieve input files: " << cmd.str() <<
"\n";
237 bool waiting =
false;
238 static bool firstWait =
true;
250 FILE*
s = popen(cmd.str().c_str(),
"r");
252 throw cms::Exception(
"WatcherSource") <<
"Failed to retrieve list of input file: " << strerror(errno);
257 if ((len = getline(&lineptr, &n, s)) > 0) {
259 lineptr[len - 1] = 0;
261 if (lineptr[0] !=
'/') {
263 fileName.assign(curDir);
264 fileName.append(
"/");
269 fileName.append(
"/");
271 fileName.append(lineptr);
274 cout <<
"[WatcherSource " <<
now() <<
"]"
275 <<
" File to process: '" << fileName <<
"'\n";
283 cout <<
"[WatcherSource " <<
now() <<
"]"
284 <<
" No file found. Waiting for new file...\n";
287 gettimeofday(&waitStart,
nullptr);
288 }
else if (!firstWait) {
290 gettimeofday(&t,
nullptr);
291 float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
293 cout <<
"[WatcherSource " <<
now() <<
"]"
294 <<
" Having waited for new file for " << (int)dt <<
" sec. "
295 <<
"Timeout exceeded. Exits.\n";
316 time_t
t = time(
nullptr);
320 cout <<
"file size: " << buf.st_size <<
", prev size: " << size <<
"\n";
321 if (buf.st_size == size)
325 if (difftime(t, buf.st_mtime) > 60)
330 if (fd != 0 && buf.st_size == 0) {
335 cout <<
"[WatcherSource " <<
now() <<
"]"
336 <<
" Executing " << c.str() <<
"\n";
337 int i = system(c.str().c_str());
340 cout <<
"[WatcherSource " <<
now() <<
"] "
341 <<
"Failed to move empty file '" <<
fileName_ <<
"'"
342 <<
" to corrupted directory '" << corruptedDir_ <<
"'\n";
351 buf1[buf1.size() - 1] = 0;
355 buf2[buf2.size() - 1] = 0;
357 string dirnam(
dirname(&buf1[0]));
358 string filenam(basename(&buf2[0]));
363 cout <<
"[WatcherSource " <<
now() <<
"]"
364 <<
" Moving file " <<
fileName_ <<
" to " << dest <<
"\n";
370 if (0 != rename(
fileName_.c_str(), dest.c_str())) {
374 << strerror(errno) <<
" (Error no " << errno <<
")";
379 cout <<
"[WatcherSource " <<
now() <<
"]"
384 ofstream
f(
".watcherfile");
387 cout <<
"[WatcherSource " <<
now() <<
"]"
388 <<
" Failed to open file " <<
fileName_ << endl;
412 cout <<
"[WatcherSource " <<
now() <<
"]"
413 <<
" Moving " <<
fileName_ <<
" to " << dest <<
"... ";
414 int i = rename(
fileName_.c_str(), dest.c_str());
417 <<
"Failed to move processed file '" <<
fileName_ <<
"'"
418 <<
" to processed directory '" <<
processedDir_ <<
": " << strerror(errno) <<
" (Error no " << errno <<
")";
424 cout <<
"Done at " <<
now() <<
"\n";
std::vector< std::string > filePatterns_
const edm::EventSetup & c
const EventMsgView * getNextEvent()
std::deque< std::string > filesInQueue_
std::string corruptedDir_
std::string inprocessDir_
WatcherStreamFileReader(edm::ParameterSet const &pset)
const InitMsgView * getHeader()
std::string processedDir_
edm::StreamerInputFile * getInputFile()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
tuple size
Write out results.
~WatcherStreamFileReader()