12 #include <sys/types.h>
25 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
29 # define SIZE_MAX ((size_t) -1)
32 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
35 ssize_t getline (
char **lineptr,
size_t *
n, FILE *fp)
46 if (*lineptr ==
NULL || *n == 0)
49 *lineptr = (
char *) malloc (*n);
69 if (cur_len + 1 >= *n)
73 size_t needed = 2 * *n + 1;
76 if (needed_max < needed)
78 if (cur_len + 1 >= needed)
84 new_lineptr = (
char *) realloc (*lineptr, needed);
85 if (new_lineptr ==
NULL)
91 *lineptr = new_lineptr;
95 (*lineptr)[cur_len] =
i;
101 (*lineptr)[cur_len] =
'\0';
102 result = cur_len ? (ssize_t) cur_len : result;
115 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
116 buf[
sizeof(buf)-1] = 0;
119 buf2 << buf <<
" " << ((t.tv_usec+500)/1000) <<
" ms";
125 inputDir_(pset.getParameter<std::
string>(
"inputDir")),
126 filePatterns_(pset.getParameter<std::vector<std::
string> >(
"filePatterns")),
127 inprocessDir_(pset.getParameter<std::
string>(
"inprocessDir")),
128 processedDir_(pset.getParameter<std::
string>(
"processedDir")),
129 corruptedDir_(pset.getParameter<std::
string>(
"corruptedDir")),
130 tokenFile_(pset.getUntrackedParameter<std::
string>(
"tokenFile",
131 "watcherSourceToken")),
132 timeOut_(pset.getParameter<int>(
"timeOutInSec")),
134 verbosity_(pset.getUntrackedParameter<int>(
"verbosity", 0)){
141 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
149 for(
unsigned i = 0;
i < dirs.size(); ++
i){
150 const string&
dir = dirs[
i];
151 struct stat fileStat;
152 if(0==stat(dir.c_str(), &fileStat)){
153 if(!S_ISDIR(fileStat.st_mode)){
155 <<
"File " << dir <<
" exists but is not a directory "
159 if(0!=
mkdir(dir.c_str(), 0755)){
161 <<
"Failed to create directory " << dir
162 <<
" for writing data.";
167 std::stringstream fileListCmdBuf;
168 fileListCmdBuf.str(
"");
174 fileListCmdBuf <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
177 char curDir[PATH_MAX>0?PATH_MAX:4096];
178 if(getcwd(curDir,
sizeof(curDir))==0){
180 <<
"Failed to retreived working directory path: "
186 if(
i>0) fileListCmdBuf <<
"|";
192 fileListCmdBuf <<
")' | sort";
196 cout <<
"[WatcherSource " <<
now() <<
"]"
197 <<
" Command to retrieve input files: "
207 return inputFile?inputFile->
newHeader():
false;
223 <<
"received wrong message type: expected INIT, got "
224 << header->
code() <<
"\n";
236 && inputFile->
next()==0){
253 bool waiting =
false;
254 static bool firstWait =
true;
269 <<
"Failed to retrieve list of input file: " << strerror(errno);
274 if((len=getline(&lineptr, &n, s))>0){
278 if(lineptr[0] !=
'/'){
281 fileName.append(
"/");
286 fileName.append(
"/");
288 fileName.append(lineptr);
291 <<
" File to process: '"
292 << fileName <<
"'\n";
295 while(!feof(s)) fgetc(s);
299 cout <<
"[WatcherSource " <<
now() <<
"]"
300 <<
" No file found. Waiting for new file...\n";
303 gettimeofday(&waitStart, 0);
304 }
else if(!firstWait){
307 float dt = (t.tv_sec-waitStart.tv_sec) * 1.
308 + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
310 cout <<
"[WatcherSource " <<
now() <<
"]"
311 <<
" Having waited for new file for " << (int)dt <<
" sec. "
312 <<
"Timeout exceeded. Exits.\n";
322 free(lineptr); lineptr=0;
336 if(
verbosity_)
cout <<
"file size: " << buf.st_size <<
", prev size: " << size <<
"\n";
337 if(buf.st_size==size)
break;
else size = buf.st_size;
338 if(difftime(t,buf.st_mtime)>60)
break;
342 if(fd!=0 && buf.st_size == 0){
350 int i = system(c.str().c_str());
353 cout <<
"[WatcherSource " <<
now() <<
"] "
354 <<
"Failed to move empty file '" <<
fileName_ <<
"'"
355 <<
" to corrupted directory '" << corruptedDir_ <<
"'\n";
364 buf1[buf1.size()-1] = 0;
368 buf2[buf1.size()-1] = 0;
370 string dirnam(
dirname(&buf1[0]));
371 string filenam(basename(&buf2[0]));
380 c <<
"/bin/mv -f \"" <<
fileName_ <<
"\" \"" << dest
384 if(0!=rename(
fileName_.c_str(), dest.c_str())){
387 <<
"Failed to move file '" <<
fileName_ <<
"' "
389 <<
": " << strerror(errno);
394 cout <<
"[WatcherSource " <<
now() <<
"]"
395 <<
" Opening file " <<
fileName_ <<
"\n" << flush;
399 ofstream
f(
".watcherfile");
402 cout <<
"[WatcherSource " <<
now() <<
"]"
403 <<
" Failed to open file " <<
fileName_ << endl;
418 <<
" Excuting " << cmd.str() <<
"\n";
419 int i = system(cmd.str().c_str());
422 <<
"Failed to move processed file '" <<
fileName_ <<
"'"
423 <<
" to processed directory '" << processedDir_ <<
"'\n";
std::vector< std::string > filePatterns_
const EventMsgView * getNextEvent()
std::deque< std::string > filesInQueue_
std::string corruptedDir_
std::string inprocessDir_
WatcherStreamFileReader(edm::ParameterSet const &pset)
const InitMsgView * getHeader()
std::string processedDir_
std::auto_ptr< edm::StreamerInputFile > streamerInputFile_
edm::StreamerInputFile * getInputFile()
volatile std::atomic< bool > shutdown_flag false
tuple size
Write out results.
~WatcherStreamFileReader()