CMS 3D CMS Logo

WatcherStreamFileReader.cc
Go to the documentation of this file.
5 
6 #include <cerrno>
7 #include <climits>
8 #include <cstdio>
9 #include <cstdlib>
10 #include <cstring>
11 #include <fcntl.h>
12 #include <fstream>
13 #include <libgen.h>
14 #include <memory>
15 #include <sys/stat.h>
16 #include <sys/time.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19 
20 //using namespace edm;
21 using namespace std;
22 
23 //std::string WatcherStreamFileReader::fileName_;
24 
25 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
26 /* getline implementation is copied from glibc. */
27 
28 #ifndef SIZE_MAX
29 #define SIZE_MAX ((size_t)-1)
30 #endif
31 #ifndef SSIZE_MAX
32 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
33 #endif
34 namespace {
35  ssize_t getline(char** lineptr, size_t* n, FILE* fp) {
36  ssize_t result = -1;
37  size_t cur_len = 0;
38 
39  if (lineptr == NULL || n == NULL || fp == NULL) {
40  errno = EINVAL;
41  return -1;
42  }
43 
44  if (*lineptr == NULL || *n == 0) {
45  *n = 120;
46  *lineptr = (char*)malloc(*n);
47  if (*lineptr == NULL) {
48  result = -1;
49  goto end;
50  }
51  }
52 
53  for (;;) {
54  int i;
55 
56  i = getc(fp);
57  if (i == EOF) {
58  result = -1;
59  break;
60  }
61 
62  /* Make enough space for len+1 (for final NUL) bytes. */
63  if (cur_len + 1 >= *n) {
64  size_t needed_max = SSIZE_MAX < SIZE_MAX ? (size_t)SSIZE_MAX + 1 : SIZE_MAX;
65  size_t needed = 2 * *n + 1; /* Be generous. */
66  char* new_lineptr;
67 
68  if (needed_max < needed)
69  needed = needed_max;
70  if (cur_len + 1 >= needed) {
71  result = -1;
72  goto end;
73  }
74 
75  new_lineptr = (char*)realloc(*lineptr, needed);
76  if (new_lineptr == NULL) {
77  result = -1;
78  goto end;
79  }
80 
81  *lineptr = new_lineptr;
82  *n = needed;
83  }
84 
85  (*lineptr)[cur_len] = i;
86  cur_len++;
87 
88  if (i == '\n')
89  break;
90  }
91  (*lineptr)[cur_len] = '\0';
92  result = cur_len ? (ssize_t)cur_len : result;
93 
94  end:
95  return result;
96  }
97 } // namespace
98 #endif
99 
100 static std::string now() {
101  struct timeval t;
102  gettimeofday(&t, nullptr);
103 
104  char buf[256];
105  strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
106  buf[sizeof(buf) - 1] = 0;
107 
108  stringstream buf2;
109  buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
110 
111  return buf2.str();
112 }
113 
115  : inputDir_(pset.getParameter<std::string>("inputDir")),
116  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
117  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
118  processedDir_(pset.getParameter<std::string>("processedDir")),
119  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
120  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
121  timeOut_(pset.getParameter<int>("timeOutInSec")),
122  end_(false),
123  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
124  struct stat buf;
125  if (stat(tokenFile_.c_str(), &buf)) {
126  FILE* f = fopen(tokenFile_.c_str(), "w");
127  if (f) {
128  fclose(f);
129  } else {
130  throw cms::Exception("WatcherSource") << "Failed to create token file.";
131  }
132  }
133  vector<string> dirs;
134  dirs.push_back(inprocessDir_);
135  dirs.push_back(processedDir_);
136  dirs.push_back(corruptedDir_);
137 
138  for (unsigned i = 0; i < dirs.size(); ++i) {
139  const string& dir = dirs[i];
140  struct stat fileStat;
141  if (0 == stat(dir.c_str(), &fileStat)) {
142  if (!S_ISDIR(fileStat.st_mode)) {
143  throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
144  << " as expected.";
145  }
146  } else { //directory does not exists, let's try to create it
147  if (0 != mkdir(dir.c_str(), 0755)) {
148  throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
149  }
150  }
151  }
152 
153  std::stringstream fileListCmdBuf;
154  fileListCmdBuf.str("");
155  // fileListCmdBuf << "/bin/ls -rt " << inputDir_ << " | egrep '(";
156  //by default ls will sort the file alphabetically which will results
157  //in ordering the files in increasing LB number, which is the desired
158  //order.
159  // fileListCmdBuf << "/bin/ls " << inputDir_ << " | egrep '(";
160  fileListCmdBuf << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
161  //TODO: validate patternDir (see ;, &&, ||) and escape special character
162  if (filePatterns_.empty())
163  throw cms::Exception("WacherSource", "filePatterns parameter is empty");
164  char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
165  if (getcwd(curDir, sizeof(curDir)) == nullptr) {
166  throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
167  }
168  curDir_ = curDir;
169 
170  for (unsigned i = 0; i < filePatterns_.size(); ++i) {
171  if (i > 0)
172  fileListCmdBuf << "|";
173  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
174  // fileListCmdBuf << curDir << "/";
175  // }
176  fileListCmdBuf << filePatterns_[i];
177  }
178  fileListCmdBuf << ")' | sort";
179 
180  fileListCmd_ = fileListCmdBuf.str();
181 
182  cout << "[WatcherSource " << now() << "]"
183  << " Command to retrieve input files: " << fileListCmd_ << "\n";
184 }
185 
187 
190  return inputFile ? inputFile->newHeader() : false;
191 }
192 
195 
196  //TODO: shall better send an exception...
197  if (inputFile == nullptr) {
198  throw cms::Exception("WatcherSource") << "No input file found.";
199  }
200 
201  const InitMsgView* header = inputFile->startMessage();
202 
203  if (header->code() != Header::INIT) //INIT Msg
204  throw cms::Exception("readHeader", "WatcherStreamFileReader")
205  << "received wrong message type: expected INIT, got " << header->code() << "\n";
206 
207  return header;
208 }
209 
211  if (end_) {
212  closeFile();
213  return nullptr;
214  }
215 
217 
218  //go to next input file, till no new event is found
219  while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
220  closeFile();
221  }
222 
223  return inputFile == nullptr ? nullptr : inputFile->currentRecord();
224 }
225 
227  char* lineptr = nullptr;
228  size_t n = 0;
229 
230  struct stat buf;
231 
232  if (stat(tokenFile_.c_str(), &buf) != 0) {
233  end_ = true;
234  }
235 
236  bool waiting = false;
237  static bool firstWait = true;
238  timeval waitStart;
239  //if no cached input file, look for new files until one is found:
240  if (!end_ && streamerInputFile_.get() == nullptr) {
241  fileName_.assign("");
242 
243  //check if we have file in the queue, if not look for new files:
244  while (filesInQueue_.empty()) {
245  if (stat(tokenFile_.c_str(), &buf) != 0) {
246  end_ = true;
247  break;
248  }
249  FILE* s = popen(fileListCmd_.c_str(), "r");
250  if (s == nullptr) {
251  throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
252  }
253 
254  ssize_t len;
255  while (!feof(s)) {
256  if ((len = getline(&lineptr, &n, s)) > 0) {
257  //remove end-of-line character:
258  lineptr[len - 1] = 0;
259  string fileName;
260  if (lineptr[0] != '/') {
261  if (!inputDir_.empty() && inputDir_[0] != '/') { //relative path
262  fileName.assign(curDir_);
263  fileName.append("/");
264  fileName.append(inputDir_);
265  } else {
266  fileName.assign(inputDir_);
267  }
268  fileName.append("/");
269  }
270  fileName.append(lineptr);
271  filesInQueue_.push_back(fileName);
272  if (verbosity_)
273  cout << "[WatcherSource " << now() << "]"
274  << " File to process: '" << fileName << "'\n";
275  }
276  }
277  while (!feof(s))
278  fgetc(s);
279  pclose(s);
280  if (filesInQueue_.empty()) {
281  if (!waiting) {
282  cout << "[WatcherSource " << now() << "]"
283  << " No file found. Waiting for new file...\n";
284  cout << flush;
285  waiting = true;
286  gettimeofday(&waitStart, nullptr);
287  } else if (!firstWait) {
288  timeval t;
289  gettimeofday(&t, nullptr);
290  float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
291  if ((timeOut_ >= 0) && (dt > timeOut_)) {
292  cout << "[WatcherSource " << now() << "]"
293  << " Having waited for new file for " << (int)dt << " sec. "
294  << "Timeout exceeded. Exits.\n";
295  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
296  end_ = true;
297  break;
298  }
299  }
300  }
301  sleep(1);
302  } //end of file queue update
303  firstWait = false;
304  free(lineptr);
305  lineptr = nullptr;
306 
307  while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
308  fileName_ = filesInQueue_.front();
309  filesInQueue_.pop_front();
310  int fd = open(fileName_.c_str(), 0);
311  if (fd != 0) {
312  struct stat buf;
313  off_t size = -1;
314  //check that file transfer is finished, by monitoring its size:
315  time_t t = time(nullptr);
316  for (;;) {
317  fstat(fd, &buf);
318  if (verbosity_)
319  cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
320  if (buf.st_size == size)
321  break;
322  else
323  size = buf.st_size;
324  if (difftime(t, buf.st_mtime) > 60)
325  break; //file older then 1 min=> tansfer must be finished
326  sleep(1);
327  }
328 
329  if (fd != 0 && buf.st_size == 0) { //file is empty. streamer reader
330  // does not like empty file=> skip it
331  stringstream c;
332  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
333  if (verbosity_)
334  cout << "[WatcherSource " << now() << "]"
335  << " Excuting " << c.str() << "\n";
336  int i = system(c.str().c_str());
337  if (i != 0) {
338  //throw cms::Exception("WatcherSource")
339  cout << "[WatcherSource " << now() << "] "
340  << "Failed to move empty file '" << fileName_ << "'"
341  << " to corrupted directory '" << corruptedDir_ << "'\n";
342  }
343  continue;
344  }
345 
346  close(fd);
347 
348  vector<char> buf1(fileName_.size() + 1);
349  copy(fileName_.begin(), fileName_.end(), buf1.begin());
350  buf1[buf1.size() - 1] = 0;
351 
352  vector<char> buf2(fileName_.size() + 1);
353  copy(fileName_.begin(), fileName_.end(), buf2.begin());
354  buf2[buf1.size() - 1] = 0;
355 
356  string dirnam(dirname(&buf1[0]));
357  string filenam(basename(&buf2[0]));
358 
359  string dest = inprocessDir_ + "/" + filenam;
360 
361  if (verbosity_)
362  cout << "[WatcherSource " << now() << "]"
363  << " Moving file " << fileName_ << " to " << dest << "\n";
364 
365  stringstream c;
366  c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest << "/.\"";
367 
368  if (0 != rename(fileName_.c_str(), dest.c_str())) {
369  //if(0!=system(c.str().c_str())){
370  throw cms::Exception("WatcherSource")
371  << "Failed to move file '" << fileName_ << "' "
372  << "to processing directory " << inprocessDir_ << ": " << strerror(errno);
373  }
374 
375  fileName_ = dest;
376 
377  cout << "[WatcherSource " << now() << "]"
378  << " Opening file " << fileName_ << "\n"
379  << flush;
380  streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
381 
382  ofstream f(".watcherfile");
383  f << fileName_;
384  } else {
385  cout << "[WatcherSource " << now() << "]"
386  << " Failed to open file " << fileName_ << endl;
387  }
388  } //loop on file queue to find one file which opening succeeded
389  }
390  return streamerInputFile_.get();
391 }
392 
394  if (streamerInputFile_.get() == nullptr)
395  return;
396  //delete the streamer input file:
397  streamerInputFile_.reset();
398  stringstream cmd;
399  //TODO: validation of processDir
400  cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
401  if (verbosity_)
402  cout << "[WatcherSource " << now() << "]"
403  << " Excuting " << cmd.str() << "\n";
404  int i = system(cmd.str().c_str());
405  if (i != 0) {
406  throw cms::Exception("WatcherSource") << "Failed to move processed file '" << fileName_ << "'"
407  << " to processed directory '" << processedDir_ << "'\n";
408  //Stop further processing to prevent endless loop:
409  end_ = true;
410  }
411  cout << flush;
412 }
mps_setup.cmd
list cmd
Definition: mps_setup.py:244
WatcherStreamFileReader::getNextEvent
const EventMsgView * getNextEvent()
Definition: WatcherStreamFileReader.cc:210
WatcherStreamFileReader::~WatcherStreamFileReader
~WatcherStreamFileReader()
Definition: WatcherStreamFileReader.cc:186
WatcherStreamFileReader::fileName_
std::string fileName_
Definition: WatcherStreamFileReader.h:64
mps_fire.i
i
Definition: mps_fire.py:428
WatcherStreamFileReader::filePatterns_
std::vector< std::string > filePatterns_
Definition: WatcherStreamFileReader.h:46
WatcherStreamFileReader::closeFile
void closeFile()
Definition: WatcherStreamFileReader.cc:393
funct::false
false
Definition: Factorize.h:29
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
WatcherStreamFileReader::filesInQueue_
std::deque< std::string > filesInQueue_
Definition: WatcherStreamFileReader.h:70
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
heppy_check.dirs
dirs
Definition: heppy_check.py:26
WatcherStreamFileReader::newHeader
const bool newHeader()
Definition: WatcherStreamFileReader.cc:188
gather_cfg.cout
cout
Definition: gather_cfg.py:144
WatcherStreamFileReader::corruptedDir_
std::string corruptedDir_
Definition: WatcherStreamFileReader.h:58
WatcherStreamFileReader::inprocessDir_
std::string inprocessDir_
Definition: WatcherStreamFileReader.h:50
personalPlayback.fp
fp
Definition: personalPlayback.py:523
WatcherStreamFileReader::curDir_
std::string curDir_
Definition: WatcherStreamFileReader.h:78
edm::StreamerInputFile::Next::kEvent
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
rename
Definition: rename.py:1
Header::INIT
Definition: MsgHeader.h:15
EventMsgView
Definition: EventMessage.h:72
WatcherStreamFileReader::timeOut_
int timeOut_
Definition: WatcherStreamFileReader.h:68
WatcherStreamFileReader::verbosity_
int verbosity_
Definition: WatcherStreamFileReader.h:74
edm::StreamerInputFile
Definition: StreamerInputFile.h:19
BTVHLTOfflineSource_cfi.dirname
dirname
Definition: BTVHLTOfflineSource_cfi.py:7
dt
float dt
Definition: AMPTWrapper.h:136
ztee.fd
fd
Definition: ztee.py:136
alignCSCRings.s
s
Definition: alignCSCRings.py:92
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1119
WatcherStreamFileReader::WatcherStreamFileReader
WatcherStreamFileReader(edm::ParameterSet const &pset)
Definition: WatcherStreamFileReader.cc:114
WatcherStreamFileReader::fileListCmd_
std::string fileListCmd_
Definition: WatcherStreamFileReader.h:76
mps_fire.end
end
Definition: mps_fire.py:242
WatcherStreamFileReader::getHeader
const InitMsgView * getHeader()
Definition: WatcherStreamFileReader.cc:193
WatcherStreamFileReader::processedDir_
std::string processedDir_
Definition: WatcherStreamFileReader.h:54
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
StreamerInputFile.h
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
SIZE_MAX
#define SIZE_MAX
Definition: WatcherStreamFileReader.cc:29
SSIZE_MAX
#define SSIZE_MAX
Definition: WatcherStreamFileReader.cc:32
edm::ParameterSet
Definition: ParameterSet.h:47
WatcherStreamFileReader::getInputFile
edm::StreamerInputFile * getInputFile()
Definition: WatcherStreamFileReader.cc:226
now
static std::string now()
Definition: WatcherStreamFileReader.cc:100
createfilelist.int
int
Definition: createfilelist.py:10
dtResolutionTest_cfi.inputFile
inputFile
Definition: dtResolutionTest_cfi.py:14
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
MsgTools.h
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
WatcherStreamFileReader::streamerInputFile_
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
Definition: WatcherStreamFileReader.h:62
NULL
#define NULL
Definition: scimark2.h:8
std
Definition: JetResolutionObject.h:76
WatcherStreamFileReader.h
Exception
Definition: hltDiff.cc:246
WatcherStreamFileReader::end_
bool end_
Definition: WatcherStreamFileReader.h:72
Exception.h
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
mps_fire.result
result
Definition: mps_fire.py:311
cms::Exception
Definition: Exception.h:70
ntuplemaker.time
time
Definition: ntuplemaker.py:310
WatcherStreamFileReader::tokenFile_
std::string tokenFile_
Definition: WatcherStreamFileReader.h:66
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
mps_fire.dest
dest
Definition: mps_fire.py:179
DeadROC_duringRun.dir
dir
Definition: DeadROC_duringRun.py:23
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
InitMsgView
Definition: InitMessage.h:61
WatcherStreamFileReader::inputDir_
std::string inputDir_
Definition: WatcherStreamFileReader.h:42