12 #include <sys/types.h> 24 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L) 28 #define SIZE_MAX ((size_t)-1) 31 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2)) 34 ssize_t getline(
char** lineptr,
size_t*
n, FILE* fp) {
43 if (*lineptr ==
NULL || *n == 0) {
45 *lineptr = (
char*)malloc(*n);
46 if (*lineptr ==
NULL) {
62 if (cur_len + 1 >= *n) {
64 size_t needed = 2 * *n + 1;
67 if (needed_max < needed)
69 if (cur_len + 1 >= needed) {
74 new_lineptr = (
char*)realloc(*lineptr, needed);
75 if (new_lineptr ==
NULL) {
80 *lineptr = new_lineptr;
84 (*lineptr)[cur_len] =
i;
90 (*lineptr)[cur_len] =
'\0';
91 result = cur_len ? (ssize_t)cur_len : result;
101 gettimeofday(&t,
nullptr);
104 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
105 buf[
sizeof(buf) - 1] = 0;
108 buf2 << buf <<
" " << ((t.tv_usec + 500) / 1000) <<
" ms";
114 : inputDir_(pset.getParameter<
std::
string>(
"inputDir")),
115 filePatterns_(pset.getParameter<
std::vector<
std::
string> >(
"filePatterns")),
116 inprocessDir_(pset.getParameter<
std::
string>(
"inprocessDir")),
117 processedDir_(pset.getParameter<
std::
string>(
"processedDir")),
118 corruptedDir_(pset.getParameter<
std::
string>(
"corruptedDir")),
119 tokenFile_(pset.getUntrackedParameter<
std::
string>(
"tokenFile",
"watcherSourceToken")),
120 timeOut_(pset.getParameter<
int>(
"timeOutInSec")),
122 verbosity_(pset.getUntrackedParameter<
int>(
"verbosity", 0)) {
129 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
137 for (
unsigned i = 0;
i < dirs.size(); ++
i) {
138 const string&
dir = dirs[
i];
139 struct stat fileStat;
140 if (0 ==
stat(dir.c_str(), &fileStat)) {
141 if (!S_ISDIR(fileStat.st_mode)) {
142 throw cms::Exception(
"[WatcherSource]") <<
"File " << dir <<
" exists but is not a directory " 146 if (0 !=
mkdir(dir.c_str(), 0755)) {
147 throw cms::Exception(
"[WatcherSource]") <<
"Failed to create directory " << dir <<
" for writing data.";
152 std::stringstream fileListCmdBuf;
153 fileListCmdBuf.str(
"");
159 fileListCmdBuf <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
162 throw cms::Exception(
"WacherSource",
"filePatterns parameter is empty");
163 char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
164 if (getcwd(curDir,
sizeof(curDir)) ==
nullptr) {
165 throw cms::Exception(
"WatcherSource") <<
"Failed to retreived working directory path: " << strerror(errno);
171 fileListCmdBuf <<
"|";
177 fileListCmdBuf <<
")' | sort";
181 cout <<
"[WatcherSource " <<
now() <<
"]" 182 <<
" Command to retrieve input files: " <<
fileListCmd_ <<
"\n";
189 return inputFile ? inputFile->
newHeader() :
false;
196 if (inputFile ==
nullptr) {
204 <<
"received wrong message type: expected INIT, got " << header->
code() <<
"\n";
222 return inputFile ==
nullptr ?
nullptr : inputFile->
currentRecord();
226 char* lineptr =
nullptr;
235 bool waiting =
false;
236 static bool firstWait =
true;
250 throw cms::Exception(
"WatcherSource") <<
"Failed to retrieve list of input file: " << strerror(errno);
255 if ((len = getline(&lineptr, &n, s)) > 0) {
257 lineptr[len - 1] = 0;
259 if (lineptr[0] !=
'/') {
262 fileName.append(
"/");
267 fileName.append(
"/");
269 fileName.append(lineptr);
272 cout <<
"[WatcherSource " <<
now() <<
"]" 273 <<
" File to process: '" << fileName <<
"'\n";
281 cout <<
"[WatcherSource " <<
now() <<
"]" 282 <<
" No file found. Waiting for new file...\n";
285 gettimeofday(&waitStart,
nullptr);
286 }
else if (!firstWait) {
288 gettimeofday(&t,
nullptr);
289 float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
291 cout <<
"[WatcherSource " <<
now() <<
"]" 292 <<
" Having waited for new file for " << (
int)dt <<
" sec. " 293 <<
"Timeout exceeded. Exits.\n";
314 time_t
t =
time(
nullptr);
318 cout <<
"file size: " << buf.st_size <<
", prev size: " << size <<
"\n";
319 if (buf.st_size == size)
323 if (difftime(t, buf.st_mtime) > 60)
328 if (fd != 0 && buf.st_size == 0) {
333 cout <<
"[WatcherSource " <<
now() <<
"]" 334 <<
" Excuting " << c.str() <<
"\n";
335 int i = system(c.str().c_str());
338 cout <<
"[WatcherSource " <<
now() <<
"] " 339 <<
"Failed to move empty file '" <<
fileName_ <<
"'" 340 <<
" to corrupted directory '" << corruptedDir_ <<
"'\n";
349 buf1[buf1.size() - 1] = 0;
353 buf2[buf1.size() - 1] = 0;
355 string dirnam(
dirname(&buf1[0]));
361 cout <<
"[WatcherSource " <<
now() <<
"]" 362 <<
" Moving file " <<
fileName_ <<
" to " << dest <<
"\n";
365 c <<
"/bin/mv -f \"" <<
fileName_ <<
"\" \"" << dest <<
"/.\"";
370 <<
"Failed to move file '" <<
fileName_ <<
"' " 371 <<
"to processing directory " <<
inprocessDir_ <<
": " << strerror(errno);
376 cout <<
"[WatcherSource " <<
now() <<
"]" 381 ofstream
f(
".watcherfile");
384 cout <<
"[WatcherSource " <<
now() <<
"]" 385 <<
" Failed to open file " <<
fileName_ << endl;
401 cout <<
"[WatcherSource " <<
now() <<
"]" 402 <<
" Excuting " << cmd.str() <<
"\n";
403 int i = system(cmd.str().c_str());
406 <<
" to processed directory '" << processedDir_ <<
"'\n";
std::vector< std::string > filePatterns_
const EventMsgView * getNextEvent()
std::deque< std::string > filesInQueue_
std::string corruptedDir_
std::string inprocessDir_
WatcherStreamFileReader(edm::ParameterSet const &pset)
const InitMsgView * getHeader()
std::string processedDir_
edm::StreamerInputFile * getInputFile()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
~WatcherStreamFileReader()