CMS 3D CMS Logo

List of all members | Public Member Functions | Private Attributes
WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

Public Member Functions

void closeFile ()
 
const InitMsgViewgetHeader ()
 
edm::StreamerInputFilegetInputFile ()
 
const EventMsgViewgetNextEvent ()
 
const bool newHeader ()
 
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 
 ~WatcherStreamFileReader ()
 

Private Attributes

std::string corruptedDir_
 
bool end_
 
std::string fileName_
 
std::vector< std::string > filePatterns_
 
std::deque< std::string > filesInQueue_
 
std::string inprocessDir_
 
std::string inputDir_
 
std::string processedDir_
 
std::unique_ptr< edm::StreamerInputFilestreamerInputFile_
 
int timeOut_
 
std::string tokenFile_
 
int verbosity_
 

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.

Constructor & Destructor Documentation

◆ WatcherStreamFileReader()

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 114 of file WatcherStreamFileReader.cc.

References visDQMUpload::buf, corruptedDir_, DeadROC_duringRun::dir, heppy_check::dirs, Exception, f, mps_fire::i, inprocessDir_, eostools::mkdir(), processedDir_, edm_modernize_messagelogger::stat, and tokenFile_.

115  : inputDir_(pset.getParameter<std::string>("inputDir")),
116  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
117  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
118  processedDir_(pset.getParameter<std::string>("processedDir")),
119  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
120  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
121  timeOut_(pset.getParameter<int>("timeOutInSec")),
122  end_(false),
123  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
124  struct stat buf;
125  if (stat(tokenFile_.c_str(), &buf)) {
126  FILE* f = fopen(tokenFile_.c_str(), "w");
127  if (f) {
128  fclose(f);
129  } else {
130  throw cms::Exception("WatcherSource") << "Failed to create token file.";
131  }
132  }
133  vector<string> dirs;
134  dirs.push_back(inprocessDir_);
135  dirs.push_back(processedDir_);
136  dirs.push_back(corruptedDir_);
137 
138  for (unsigned i = 0; i < dirs.size(); ++i) {
139  const string& dir = dirs[i];
140  struct stat fileStat;
141  if (0 == stat(dir.c_str(), &fileStat)) {
142  if (!S_ISDIR(fileStat.st_mode)) {
143  throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
144  << " as expected.";
145  }
146  } else { //directory does not exists, let's try to create it
147  if (0 != mkdir(dir.c_str(), 0755)) {
148  throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
149  }
150  }
151  }
152 }
std::vector< std::string > filePatterns_
double f[11][100]
def mkdir(path)
Definition: eostools.py:251

◆ ~WatcherStreamFileReader()

WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 154 of file WatcherStreamFileReader.cc.

154 {}

Member Function Documentation

◆ closeFile()

void WatcherStreamFileReader::closeFile ( )

Definition at line 395 of file WatcherStreamFileReader.cc.

References visDQMUpload::buf, mps_setup::cmd, filterCSVwithJSON::copy, gather_cfg::cout, mps_fire::dest, end_, Exception, fileName_, mps_fire::i, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent().

395  {
396  if (streamerInputFile_.get() == nullptr)
397  return;
398  //delete the streamer input file:
399  streamerInputFile_.reset();
400  stringstream cmd;
401  //TODO: validation of processDir
402  //cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
403  //if(verbosity_) cout << "[WatcherSource " << now() << "]"
404  //<< " Executing (in closeFile())" << cmd.str() << "\n";
405  //int i = system(cmd.str().c_str());
406  //cout << "move command done at " << now() << "\n";
407  vector<char> buf(fileName_.size() + 1);
408  copy(fileName_.begin(), fileName_.end(), buf.begin());
409  buf[buf.size() - 1] = 0;
410  string dest = processedDir_ + "/" + basename(&buf[0]);
411  if (verbosity_)
412  cout << "[WatcherSource " << now() << "]"
413  << " Moving " << fileName_ << " to " << dest << "... ";
414  int i = rename(fileName_.c_str(), dest.c_str());
415  if (i != 0) {
416  throw cms::Exception("WatcherSource")
417  << "Failed to move processed file '" << fileName_ << "'"
418  << " to processed directory '" << processedDir_ << ": " << strerror(errno) << " (Error no " << errno << ")";
419 
420  //Stop further processing to prevent endless loop:
421  end_ = true;
422  }
423  if (verbosity_)
424  cout << "Done at " << now() << "\n";
425 
426  // cout << flush;
427 }
Definition: rename.py:1
static std::string now()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
list cmd
Definition: mps_setup.py:244

◆ getHeader()

const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 161 of file WatcherStreamFileReader.cc.

References Exception, getInputFile(), RecoTauValidation_cfi::header, Header::INIT, and makeListRunsInFiles::inputFile.

161  {
163 
164  //TODO: shall better send an exception...
165  if (inputFile == nullptr) {
166  throw cms::Exception("WatcherSource") << "No input file found.";
167  }
168 
169  const InitMsgView* header = inputFile->startMessage();
170 
171  if (header->code() != Header::INIT) //INIT Msg
172  throw cms::Exception("readHeader", "WatcherStreamFileReader")
173  << "received wrong message type: expected INIT, got " << header->code() << "\n";
174 
175  return header;
176 }
edm::StreamerInputFile * getInputFile()

◆ getInputFile()

edm::StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 194 of file WatcherStreamFileReader.cc.

References visDQMUpload::buf, HltBtagPostValidation_cff::c, mps_setup::cmd, filterCSVwithJSON::copy, corruptedDir_, gather_cfg::cout, mps_fire::dest, BTVHLTOfflineSource_cfi::dirname, dt, end_, Exception, f, ztee::fd, MillePedeFileConverter_cfg::fileName, fileName_, filePatterns_, filesInQueue_, mps_fire::i, inprocessDir_, inputDir_, createfilelist::int, dqmiodumpmetadata::n, now(), alignCSCRings::s, findQualityFiles::size, edm_modernize_messagelogger::stat, streamerInputFile_, submitPVValidationJobs::t, protons_cff::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

194  {
195  char* lineptr = nullptr;
196  size_t n = 0;
197  static stringstream cmd;
198  static bool cmdSet = false;
199  static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
200 
201  if (!cmdSet) {
202  cmd.str("");
203  // cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
204  //by default ls will sort the file alphabetically which will results
205  //in ordering the files in increasing LB number, which is the desired
206  //order.
207  // cmd << "/bin/ls " << inputDir_ << " | egrep '(";
208  cmd << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
209  //TODO: validate patternDir (see ;, &&, ||) and escape special character
210  if (filePatterns_.empty())
211  return nullptr;
212  if (getcwd(curDir, sizeof(curDir)) == nullptr) {
213  throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
214  }
215 
216  for (unsigned i = 0; i < filePatterns_.size(); ++i) {
217  if (i > 0)
218  cmd << "|";
219  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
220  // cmd << curDir << "/";
221  // }
222  cmd << filePatterns_[i];
223  }
224  cmd << ")' | sort";
225 
226  cout << "[WatcherSource " << now() << "]"
227  << " Command to retrieve input files: " << cmd.str() << "\n";
228  cmdSet = true;
229  }
230 
231  struct stat buf;
232 
233  if (stat(tokenFile_.c_str(), &buf) != 0) {
234  end_ = true;
235  }
236 
237  bool waiting = false;
238  static bool firstWait = true;
239  timeval waitStart;
240  //if no cached input file, look for new files until one is found:
241  if (!end_ && streamerInputFile_.get() == nullptr) {
242  fileName_.assign("");
243 
244  //check if we have file in the queue, if not look for new files:
245  while (filesInQueue_.empty()) {
246  if (stat(tokenFile_.c_str(), &buf) != 0) {
247  end_ = true;
248  break;
249  }
250  FILE* s = popen(cmd.str().c_str(), "r");
251  if (s == nullptr) {
252  throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
253  }
254 
255  ssize_t len;
256  while (!feof(s)) {
257  if ((len = getline(&lineptr, &n, s)) > 0) {
258  //remove end-of-line character:
259  lineptr[len - 1] = 0;
260  string fileName;
261  if (lineptr[0] != '/') {
262  if (!inputDir_.empty() && inputDir_[0] != '/') { //relative path
263  fileName.assign(curDir);
264  fileName.append("/");
265  fileName.append(inputDir_);
266  } else {
267  fileName.assign(inputDir_);
268  }
269  fileName.append("/");
270  }
271  fileName.append(lineptr);
272  filesInQueue_.push_back(fileName);
273  if (verbosity_)
274  cout << "[WatcherSource " << now() << "]"
275  << " File to process: '" << fileName << "'\n";
276  }
277  }
278  while (!feof(s))
279  fgetc(s);
280  pclose(s);
281  if (filesInQueue_.empty()) {
282  if (!waiting) {
283  cout << "[WatcherSource " << now() << "]"
284  << " No file found. Waiting for new file...\n";
285  cout << flush;
286  waiting = true;
287  gettimeofday(&waitStart, nullptr);
288  } else if (!firstWait) {
289  timeval t;
290  gettimeofday(&t, nullptr);
291  float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
292  if ((timeOut_ >= 0) && (dt > timeOut_)) {
293  cout << "[WatcherSource " << now() << "]"
294  << " Having waited for new file for " << (int)dt << " sec. "
295  << "Timeout exceeded. Exits.\n";
296  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
297  end_ = true;
298  break;
299  }
300  }
301  }
302  sleep(1);
303  } //end of file queue update
304  firstWait = false;
305  free(lineptr);
306  lineptr = nullptr;
307 
308  while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
309  fileName_ = filesInQueue_.front();
310  filesInQueue_.pop_front();
311  int fd = open(fileName_.c_str(), 0);
312  if (fd != 0) {
313  struct stat buf;
314  off_t size = -1;
315  //check that file transfer is finished, by monitoring its size:
316  time_t t = time(nullptr);
317  for (;;) {
318  fstat(fd, &buf);
319  if (verbosity_)
320  cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
321  if (buf.st_size == size)
322  break;
323  else
324  size = buf.st_size;
325  if (difftime(t, buf.st_mtime) > 60)
326  break; //file older then 1 min=> tansfer must be finished
327  sleep(1);
328  }
329 
330  if (fd != 0 && buf.st_size == 0) { //file is empty. streamer reader
331  // does not like empty file=> skip it
332  stringstream c;
333  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
334  if (verbosity_)
335  cout << "[WatcherSource " << now() << "]"
336  << " Executing " << c.str() << "\n";
337  int i = system(c.str().c_str());
338  if (i != 0) {
339  //throw cms::Exception("WatcherSource")
340  cout << "[WatcherSource " << now() << "] "
341  << "Failed to move empty file '" << fileName_ << "'"
342  << " to corrupted directory '" << corruptedDir_ << "'\n";
343  }
344  continue;
345  }
346 
347  close(fd);
348 
349  vector<char> buf1(fileName_.size() + 1);
350  copy(fileName_.begin(), fileName_.end(), buf1.begin());
351  buf1[buf1.size() - 1] = 0;
352 
353  vector<char> buf2(fileName_.size() + 1);
354  copy(fileName_.begin(), fileName_.end(), buf2.begin());
355  buf2[buf2.size() - 1] = 0;
356 
357  string dirnam(dirname(&buf1[0]));
358  string filenam(basename(&buf2[0]));
359 
360  string dest = inprocessDir_ + "/" + filenam;
361 
362  if (verbosity_)
363  cout << "[WatcherSource " << now() << "]"
364  << " Moving file " << fileName_ << " to " << dest << "\n";
365 
366  // stringstream c;
367  //c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest
368  // << "/.\"";
369 
370  if (0 != rename(fileName_.c_str(), dest.c_str())) {
371  //if(0!=system(c.str().c_str())){
372  throw cms::Exception("WatcherSource") << "Failed to move file '" << fileName_ << "' "
373  << "to processing directory " << inprocessDir_ << ": "
374  << strerror(errno) << " (Error no " << errno << ")";
375  }
376 
377  fileName_ = dest;
378 
379  cout << "[WatcherSource " << now() << "]"
380  << " Opening file " << fileName_ << "\n"
381  << flush;
382  streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
383 
384  ofstream f(".watcherfile");
385  f << fileName_;
386  } else {
387  cout << "[WatcherSource " << now() << "]"
388  << " Failed to open file " << fileName_ << endl;
389  }
390  } //loop on file queue to find one file which opening succeeded
391  }
392  return streamerInputFile_.get();
393 }
std::vector< std::string > filePatterns_
size
Write out results.
float dt
Definition: AMPTWrapper.h:136
Definition: rename.py:1
std::deque< std::string > filesInQueue_
double f[11][100]
static std::string now()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
list cmd
Definition: mps_setup.py:244
fd
Definition: ztee.py:136

◆ getNextEvent()

const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 178 of file WatcherStreamFileReader.cc.

References closeFile(), end_, getInputFile(), makeListRunsInFiles::inputFile, and edm::StreamerInputFile::kEvent.

178  {
179  if (end_) {
180  closeFile();
181  return nullptr;
182  }
183 
185 
186  //go to next input file, till no new event is found
187  while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
188  closeFile();
189  }
190 
191  return inputFile == nullptr ? nullptr : inputFile->currentRecord();
192 }
edm::StreamerInputFile * getInputFile()

◆ newHeader()

const bool WatcherStreamFileReader::newHeader ( )

Definition at line 156 of file WatcherStreamFileReader.cc.

References getInputFile(), and makeListRunsInFiles::inputFile.

156  {
158  return inputFile ? inputFile->newHeader() : false;
159 }
edm::StreamerInputFile * getInputFile()

Member Data Documentation

◆ corruptedDir_

std::string WatcherStreamFileReader::corruptedDir_
private

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 58 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ end_

bool WatcherStreamFileReader::end_
private

Definition at line 72 of file WatcherStreamFileReader.h.

Referenced by closeFile(), getInputFile(), and getNextEvent().

◆ fileName_

std::string WatcherStreamFileReader::fileName_
private

Definition at line 64 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

◆ filePatterns_

std::vector<std::string> WatcherStreamFileReader::filePatterns_
private

Streamer file name pattern list

Definition at line 46 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ filesInQueue_

std::deque<std::string> WatcherStreamFileReader::filesInQueue_
private

Definition at line 70 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ inprocessDir_

std::string WatcherStreamFileReader::inprocessDir_
private

Directory where file are moved during processing

Definition at line 50 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ inputDir_

std::string WatcherStreamFileReader::inputDir_
private

Directory to look for streamer files

Definition at line 42 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ processedDir_

std::string WatcherStreamFileReader::processedDir_
private

Directory where file must be moved once processed

Definition at line 54 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and WatcherStreamFileReader().

◆ streamerInputFile_

std::unique_ptr<edm::StreamerInputFile> WatcherStreamFileReader::streamerInputFile_
private

Cached input file stream

Definition at line 62 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

◆ timeOut_

int WatcherStreamFileReader::timeOut_
private

Definition at line 68 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ tokenFile_

std::string WatcherStreamFileReader::tokenFile_
private

Definition at line 66 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ verbosity_

int WatcherStreamFileReader::verbosity_
private

Definition at line 74 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().