17 #include <sys/types.h> 26 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L) 30 #define SIZE_MAX ((size_t) - 1) 33 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2)) 36 ssize_t getline(
char** lineptr,
size_t*
n, FILE*
fp) {
45 if (*lineptr ==
NULL || *
n == 0) {
48 if (*lineptr ==
NULL) {
64 if (cur_len + 1 >= *
n) {
66 size_t needed = 2 * *
n + 1;
69 if (needed_max < needed)
71 if (cur_len + 1 >= needed) {
76 new_lineptr = (
char*)
realloc(*lineptr, needed);
77 if (new_lineptr ==
NULL) {
82 *lineptr = new_lineptr;
86 (*lineptr)[cur_len] =
i;
92 (*lineptr)[cur_len] =
'\0';
103 gettimeofday(&
t,
nullptr);
106 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
110 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
118 inprocessDir_(
pset.getParameter<
std::
string>(
"inprocessDir")),
119 processedDir_(
pset.getParameter<
std::
string>(
"processedDir")),
120 corruptedDir_(
pset.getParameter<
std::
string>(
"corruptedDir")),
121 tokenFile_(
pset.getUntrackedParameter<
std::
string>(
"tokenFile",
"watcherSourceToken")),
122 timeOut_(
pset.getParameter<
int>(
"timeOutInSec")),
124 verbosity_(
pset.getUntrackedParameter<
int>(
"verbosity", 0)) {
131 throw cms::Exception(
"WatcherSource") <<
"Failed to create token file.";
139 for (
unsigned i = 0;
i <
dirs.size(); ++
i) {
141 struct stat fileStat;
142 if (0 ==
stat(
dir.c_str(), &fileStat)) {
143 if (!S_ISDIR(fileStat.st_mode)) {
144 throw cms::Exception(
"[WatcherSource]") <<
"File " <<
dir <<
" exists but is not a directory " 148 if (0 !=
mkdir(
dir.c_str(), 0755)) {
149 throw cms::Exception(
"[WatcherSource]") <<
"Failed to create directory " <<
dir <<
" for writing data.";
169 if (
header->code() != Header::INIT)
171 <<
"received wrong message type: expected INIT, got " <<
header->code() <<
"\n";
192 char* lineptr =
nullptr;
194 static stringstream
cmd;
195 static bool cmdSet =
false;
196 static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
205 cmd <<
"/bin/find " <<
inputDir_ <<
" -maxdepth 2 -print | egrep '(";
209 if (getcwd(curDir,
sizeof(curDir)) ==
nullptr) {
210 throw cms::Exception(
"WatcherSource") <<
"Failed to retreived working directory path: " << strerror(errno);
223 cout <<
"[WatcherSource " <<
now() <<
"]" 224 <<
" Command to retrieve input files: " <<
cmd.str() <<
"\n";
234 bool waiting =
false;
235 static bool firstWait =
true;
247 FILE*
s = popen(
cmd.str().c_str(),
"r");
249 throw cms::Exception(
"WatcherSource") <<
"Failed to retrieve list of input file: " << strerror(errno);
254 if ((len = getline(&lineptr, &
n,
s)) > 0) {
256 lineptr[len - 1] = 0;
258 if (lineptr[0] !=
'/') {
271 cout <<
"[WatcherSource " <<
now() <<
"]" 272 <<
" File to process: '" <<
fileName <<
"'\n";
280 cout <<
"[WatcherSource " <<
now() <<
"]" 281 <<
" No file found. Waiting for new file...\n";
284 gettimeofday(&waitStart,
nullptr);
285 }
else if (!firstWait) {
287 gettimeofday(&
t,
nullptr);
288 float dt = (
t.tv_sec - waitStart.tv_sec) * 1. + (
t.tv_usec - waitStart.tv_usec) * 1.e-6;
290 cout <<
"[WatcherSource " <<
now() <<
"]" 291 <<
" Having waited for new file for " << (
int)
dt <<
" sec. " 292 <<
"Timeout exceeded. Exits.\n";
313 time_t
t =
time(
nullptr);
317 cout <<
"file size: " <<
buf.st_size <<
", prev size: " <<
size <<
"\n";
322 if (difftime(
t,
buf.st_mtime) > 60)
327 if (
fd != 0 &&
buf.st_size == 0) {
332 cout <<
"[WatcherSource " <<
now() <<
"]" 333 <<
" Executing " <<
c.str() <<
"\n";
334 int i = system(
c.str().c_str());
337 cout <<
"[WatcherSource " <<
now() <<
"] " 338 <<
"Failed to move empty file '" <<
fileName_ <<
"'" 348 buf1[buf1.size() - 1] = 0;
352 buf2[buf2.size() - 1] = 0;
354 string dirnam(
dirname(&buf1[0]));
355 string filenam(basename(&buf2[0]));
360 cout <<
"[WatcherSource " <<
now() <<
"]" 371 << strerror(errno) <<
" (Error no " << errno <<
")";
376 cout <<
"[WatcherSource " <<
now() <<
"]" 381 ofstream
f(
".watcherfile");
384 cout <<
"[WatcherSource " <<
now() <<
"]" 385 <<
" Failed to open file " <<
fileName_ << endl;
411 cout <<
"[WatcherSource " <<
now() <<
"]" 416 <<
"Failed to move processed file '" <<
fileName_ <<
"'" 417 <<
" to processed directory '" <<
processedDir_ <<
": " << strerror(errno) <<
" (Error no " << errno <<
")";
423 cout <<
"Done at " <<
now() <<
"\n";
std::vector< std::string > filePatterns_
const edm::streamer::EventMsgView * getNextEvent()
std::deque< std::string > filesInQueue_
std::string corruptedDir_
constexpr char const *const kStop
std::unique_ptr< edm::streamer::StreamerInputFile > streamerInputFile_
std::string inprocessDir_
edm::streamer::StreamerInputFile * getInputFile()
WatcherStreamFileReader(edm::ParameterSet const &pset)
void free(void *ptr) noexcept
const edm::streamer::InitMsgView * getHeader()
std::string processedDir_
void * malloc(size_t size) noexcept
void * realloc(void *ptr, size_t size) noexcept
~WatcherStreamFileReader()