12 #include <sys/types.h>
25 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
29 # define SIZE_MAX ((size_t) -1)
32 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
35 ssize_t getline (
char **lineptr,
size_t *
n, FILE *fp)
46 if (*lineptr ==
NULL || *n == 0)
49 *lineptr = (
char *) malloc (*n);
69 if (cur_len + 1 >= *n)
73 size_t needed = 2 * *n + 1;
76 if (needed_max < needed)
78 if (cur_len + 1 >= needed)
84 new_lineptr = (
char *) realloc (*lineptr, needed);
85 if (new_lineptr ==
NULL)
91 *lineptr = new_lineptr;
95 (*lineptr)[cur_len] =
i;
101 (*lineptr)[cur_len] =
'\0';
102 result = cur_len ? (ssize_t) cur_len : result;
115 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
116 buf[
sizeof(buf)-1] = 0;
119 buf2 << buf <<
" " << ((t.tv_usec+500)/1000) <<
" ms";
125 inputDir_(pset.getParameter<std::
string>(
"inputDir")),
126 filePatterns_(pset.getParameter<std::vector<std::
string> >(
"filePatterns")),
127 inprocessDir_(pset.getParameter<std::
string>(
"inprocessDir")),
128 processedDir_(pset.getParameter<std::
string>(
"processedDir")),
129 corruptedDir_(pset.getParameter<std::
string>(
"corruptedDir")),
130 tokenFile_(pset.getUntrackedParameter<std::
string>(
"tokenFile",
131 "watcherSourceToken")),
132 timeOut_(pset.getParameter<int>(
"timeOutInSec")),
134 verbosity_(pset.getUntrackedParameter<int>(
"verbosity", 0)){
141 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
149 for(
unsigned i = 0;
i < dirs.size(); ++
i){
150 const string&
dir = dirs[
i];
151 struct stat fileStat;
152 if(0==stat(dir.c_str(), &fileStat)){
153 if(!S_ISDIR(fileStat.st_mode)){
155 <<
"File " << dir <<
" exists but is not a directory "
159 if(0!=mkdir(dir.c_str(), 0755)){
161 <<
"Failed to create directory " << dir
162 <<
" for writing data.";
173 return inputFile?inputFile->
newHeader():
false;
189 <<
"received wrong message type: expected INIT, got "
190 << header->
code() <<
"\n";
202 && inputFile->
next()==0){
212 static stringstream
cmd;
213 static bool cmdSet =
false;
214 static char curDir[PATH_MAX>0?PATH_MAX:4096];
218 cmd <<
"/bin/ls -rt " <<
inputDir_ <<
" | egrep '(";
221 if(getcwd(curDir,
sizeof(curDir))==0){
223 <<
"Failed to retreived working directory path: "
236 cout <<
"[WatcherSource " <<
now() <<
"]"
237 <<
" Command to retrieve input files: "
238 << cmd.str() <<
"\n";
248 bool waiting =
false;
249 static bool firstWait =
true;
261 FILE*
s = popen(cmd.str().c_str(),
"r");
264 <<
"Failed to retrieve list of input file: " << strerror(errno);
269 if((len=getline(&lineptr, &n, s))>0){
274 fileName.assign(curDir);
275 fileName.append(
"/");
280 fileName.append(
"/");
281 fileName.append(lineptr);
284 <<
" File to process: '"
285 << fileName <<
"'\n";
288 while(!feof(s)) fgetc(s);
292 cout <<
"[WatcherSource " <<
now() <<
"]"
293 <<
" No file found. Waiting for new file...\n";
296 gettimeofday(&waitStart, 0);
297 }
else if(!firstWait){
300 float dt = (t.tv_sec-waitStart.tv_sec) * 1.
301 + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
303 cout <<
"[WatcherSource " <<
now() <<
"]"
304 <<
" Having waited for new file for " << (int)dt <<
" sec. "
305 <<
"Timeout exceeded. Exits.\n";
315 free(lineptr); lineptr=0;
329 if(
verbosity_)
cout <<
"file size: " << buf.st_size <<
", prev size: " << size <<
"\n";
330 if(buf.st_size==size)
break;
else size = buf.st_size;
331 if(difftime(t,buf.st_mtime)>60)
break;
335 if(fd!=0 && buf.st_size == 0){
343 int i = system(c.str().c_str());
346 cout <<
"[WatcherSource " <<
now() <<
"] "
347 <<
"Failed to move empty file '" <<
fileName_ <<
"'"
348 <<
" to corrupted directory '" << corruptedDir_ <<
"'\n";
357 buf1[buf1.size()-1] = 0;
361 buf2[buf1.size()-1] = 0;
363 string dirnam(dirname(&buf1[0]));
364 string filenam(basename(&buf2[0]));
372 if(0!=rename(
fileName_.c_str(), dest.c_str())){
374 <<
"Failed to move file '" <<
fileName_ <<
"' "
376 <<
": " << strerror(errno);
381 cout <<
"[WatcherSource " <<
now() <<
"]"
382 <<
" Opening file " <<
fileName_ <<
"\n" << flush;
386 ofstream
f(
".watcherfile");
389 cout <<
"[WatcherSource " <<
now() <<
"]"
390 <<
" Failed to open file " <<
fileName_ << endl;
405 <<
" Excuting " << cmd.str() <<
"\n";
406 int i = system(cmd.str().c_str());
409 <<
"Failed to move processed file '" <<
fileName_ <<
"'"
410 <<
" to processed directory '" << processedDir_ <<
"'\n";
std::vector< std::string > filePatterns_
const EventMsgView * getNextEvent()
std::deque< std::string > filesInQueue_
std::string corruptedDir_
std::string inprocessDir_
WatcherStreamFileReader(edm::ParameterSet const &pset)
const InitMsgView * getHeader()
std::string processedDir_
std::auto_ptr< edm::StreamerInputFile > streamerInputFile_
edm::StreamerInputFile * getInputFile()
volatile std::atomic< bool > shutdown_flag false
tuple size
Write out results.
~WatcherStreamFileReader()
static std::string fileName_