CMS 3D CMS Logo

List of all members | Public Member Functions | Private Attributes
WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

Public Member Functions

void closeFile ()
 
const InitMsgViewgetHeader ()
 
edm::StreamerInputFilegetInputFile ()
 
const EventMsgViewgetNextEvent ()
 
const bool newHeader ()
 
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 
 ~WatcherStreamFileReader ()
 

Private Attributes

std::string corruptedDir_
 
std::string curDir_
 
bool end_
 
std::string fileListCmd_
 
std::string fileName_
 
std::vector< std::string > filePatterns_
 
std::deque< std::string > filesInQueue_
 
std::string inprocessDir_
 
std::string inputDir_
 
std::string processedDir_
 
std::unique_ptr< edm::StreamerInputFilestreamerInputFile_
 
int timeOut_
 
std::string tokenFile_
 
int verbosity_
 

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.

Constructor & Destructor Documentation

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 113 of file WatcherStreamFileReader.cc.

References corruptedDir_, gather_cfg::cout, curDir_, dir, heppy_check::dirs, Exception, f, fileListCmd_, filePatterns_, mps_fire::i, inprocessDir_, inputDir_, eostools::mkdir(), now(), processedDir_, trackingPlots::stat, and tokenFile_.

114  : inputDir_(pset.getParameter<std::string>("inputDir")),
115  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
116  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
117  processedDir_(pset.getParameter<std::string>("processedDir")),
118  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
119  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
120  timeOut_(pset.getParameter<int>("timeOutInSec")),
121  end_(false),
122  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
123  struct stat buf;
124  if (stat(tokenFile_.c_str(), &buf)) {
125  FILE* f = fopen(tokenFile_.c_str(), "w");
126  if (f) {
127  fclose(f);
128  } else {
129  throw cms::Exception("WatcherSource") << "Failed to create token file.";
130  }
131  }
132  vector<string> dirs;
133  dirs.push_back(inprocessDir_);
134  dirs.push_back(processedDir_);
135  dirs.push_back(corruptedDir_);
136 
137  for (unsigned i = 0; i < dirs.size(); ++i) {
138  const string& dir = dirs[i];
139  struct stat fileStat;
140  if (0 == stat(dir.c_str(), &fileStat)) {
141  if (!S_ISDIR(fileStat.st_mode)) {
142  throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
143  << " as expected.";
144  }
145  } else { //directory does not exists, let's try to create it
146  if (0 != mkdir(dir.c_str(), 0755)) {
147  throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
148  }
149  }
150  }
151 
152  std::stringstream fileListCmdBuf;
153  fileListCmdBuf.str("");
154  // fileListCmdBuf << "/bin/ls -rt " << inputDir_ << " | egrep '(";
155  //by default ls will sort the file alphabetically which will results
156  //in ordering the files in increasing LB number, which is the desired
157  //order.
158  // fileListCmdBuf << "/bin/ls " << inputDir_ << " | egrep '(";
159  fileListCmdBuf << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
160  //TODO: validate patternDir (see ;, &&, ||) and escape special character
161  if (filePatterns_.empty())
162  throw cms::Exception("WacherSource", "filePatterns parameter is empty");
163  char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
164  if (getcwd(curDir, sizeof(curDir)) == nullptr) {
165  throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
166  }
167  curDir_ = curDir;
168 
169  for (unsigned i = 0; i < filePatterns_.size(); ++i) {
170  if (i > 0)
171  fileListCmdBuf << "|";
172  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
173  // fileListCmdBuf << curDir << "/";
174  // }
175  fileListCmdBuf << filePatterns_[i];
176  }
177  fileListCmdBuf << ")' | sort";
178 
179  fileListCmd_ = fileListCmdBuf.str();
180 
181  cout << "[WatcherSource " << now() << "]"
182  << " Command to retrieve input files: " << fileListCmd_ << "\n";
183 }
std::vector< std::string > filePatterns_
double f[11][100]
static std::string now()
def mkdir(path)
Definition: eostools.py:251
dbl *** dir
Definition: mlp_gen.cc:35
WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 185 of file WatcherStreamFileReader.cc.

185 {}

Member Function Documentation

void WatcherStreamFileReader::closeFile ( )

Definition at line 392 of file WatcherStreamFileReader.cc.

References mps_setup::cmd, gather_cfg::cout, end_, Exception, fileName_, mps_fire::i, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent(), and Vispa.Main.Application.Application::tabCloseRequest().

392  {
393  if (streamerInputFile_.get() == nullptr)
394  return;
395  //delete the streamer input file:
396  streamerInputFile_.reset();
397  stringstream cmd;
398  //TODO: validation of processDir
399  cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
400  if (verbosity_)
401  cout << "[WatcherSource " << now() << "]"
402  << " Excuting " << cmd.str() << "\n";
403  int i = system(cmd.str().c_str());
404  if (i != 0) {
405  throw cms::Exception("WatcherSource") << "Failed to move processed file '" << fileName_ << "'"
406  << " to processed directory '" << processedDir_ << "'\n";
407  //Stop further processing to prevent endless loop:
408  end_ = true;
409  }
410  cout << flush;
411 }
static std::string now()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
list cmd
Definition: mps_setup.py:244
const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 192 of file WatcherStreamFileReader.cc.

References InitMsgView::code(), Exception, getInputFile(), RecoTauValidation_cfi::header, Header::INIT, analyzePatCleaning_cfg::inputFile, and edm::StreamerInputFile::startMessage().

192  {
194 
195  //TODO: shall better send an exception...
196  if (inputFile == nullptr) {
197  throw cms::Exception("WatcherSource") << "No input file found.";
198  }
199 
200  const InitMsgView* header = inputFile->startMessage();
201 
202  if (header->code() != Header::INIT) //INIT Msg
203  throw cms::Exception("readHeader", "WatcherStreamFileReader")
204  << "received wrong message type: expected INIT, got " << header->code() << "\n";
205 
206  return header;
207 }
InitMsgView const * startMessage() const
edm::StreamerInputFile * getInputFile()
uint32 code() const
Definition: InitMessage.h:65
edm::StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 225 of file WatcherStreamFileReader.cc.

References estimatePileup::basename, EnergyCorrector::c, popcon2dropbox::copy(), corruptedDir_, gather_cfg::cout, curDir_, mps_fire::dest, compare_using_db::dirname, dt, end_, Exception, f, fileListCmd_, MillePedeFileConverter_cfg::fileName, fileName_, filesInQueue_, mps_fire::i, inprocessDir_, inputDir_, createfilelist::int, gen::n, now(), alignCSCRings::s, findQualityFiles::size, trackingPlots::stat, streamerInputFile_, lumiQTWidget::t, ntuplemaker::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

225  {
226  char* lineptr = nullptr;
227  size_t n = 0;
228 
229  struct stat buf;
230 
231  if (stat(tokenFile_.c_str(), &buf) != 0) {
232  end_ = true;
233  }
234 
235  bool waiting = false;
236  static bool firstWait = true;
237  timeval waitStart;
238  //if no cached input file, look for new files until one is found:
239  if (!end_ && streamerInputFile_.get() == nullptr) {
240  fileName_.assign("");
241 
242  //check if we have file in the queue, if not look for new files:
243  while (filesInQueue_.empty()) {
244  if (stat(tokenFile_.c_str(), &buf) != 0) {
245  end_ = true;
246  break;
247  }
248  FILE* s = popen(fileListCmd_.c_str(), "r");
249  if (s == nullptr) {
250  throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
251  }
252 
253  ssize_t len;
254  while (!feof(s)) {
255  if ((len = getline(&lineptr, &n, s)) > 0) {
256  //remove end-of-line character:
257  lineptr[len - 1] = 0;
258  string fileName;
259  if (lineptr[0] != '/') {
260  if (!inputDir_.empty() && inputDir_[0] != '/') { //relative path
261  fileName.assign(curDir_);
262  fileName.append("/");
263  fileName.append(inputDir_);
264  } else {
265  fileName.assign(inputDir_);
266  }
267  fileName.append("/");
268  }
269  fileName.append(lineptr);
270  filesInQueue_.push_back(fileName);
271  if (verbosity_)
272  cout << "[WatcherSource " << now() << "]"
273  << " File to process: '" << fileName << "'\n";
274  }
275  }
276  while (!feof(s))
277  fgetc(s);
278  pclose(s);
279  if (filesInQueue_.empty()) {
280  if (!waiting) {
281  cout << "[WatcherSource " << now() << "]"
282  << " No file found. Waiting for new file...\n";
283  cout << flush;
284  waiting = true;
285  gettimeofday(&waitStart, nullptr);
286  } else if (!firstWait) {
287  timeval t;
288  gettimeofday(&t, nullptr);
289  float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
290  if ((timeOut_ >= 0) && (dt > timeOut_)) {
291  cout << "[WatcherSource " << now() << "]"
292  << " Having waited for new file for " << (int)dt << " sec. "
293  << "Timeout exceeded. Exits.\n";
294  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
295  end_ = true;
296  break;
297  }
298  }
299  }
300  sleep(1);
301  } //end of file queue update
302  firstWait = false;
303  free(lineptr);
304  lineptr = nullptr;
305 
306  while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
307  fileName_ = filesInQueue_.front();
308  filesInQueue_.pop_front();
309  int fd = open(fileName_.c_str(), 0);
310  if (fd != 0) {
311  struct stat buf;
312  off_t size = -1;
313  //check that file transfer is finished, by monitoring its size:
314  time_t t = time(nullptr);
315  for (;;) {
316  fstat(fd, &buf);
317  if (verbosity_)
318  cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
319  if (buf.st_size == size)
320  break;
321  else
322  size = buf.st_size;
323  if (difftime(t, buf.st_mtime) > 60)
324  break; //file older then 1 min=> tansfer must be finished
325  sleep(1);
326  }
327 
328  if (fd != 0 && buf.st_size == 0) { //file is empty. streamer reader
329  // does not like empty file=> skip it
330  stringstream c;
331  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
332  if (verbosity_)
333  cout << "[WatcherSource " << now() << "]"
334  << " Excuting " << c.str() << "\n";
335  int i = system(c.str().c_str());
336  if (i != 0) {
337  //throw cms::Exception("WatcherSource")
338  cout << "[WatcherSource " << now() << "] "
339  << "Failed to move empty file '" << fileName_ << "'"
340  << " to corrupted directory '" << corruptedDir_ << "'\n";
341  }
342  continue;
343  }
344 
345  close(fd);
346 
347  vector<char> buf1(fileName_.size() + 1);
348  copy(fileName_.begin(), fileName_.end(), buf1.begin());
349  buf1[buf1.size() - 1] = 0;
350 
351  vector<char> buf2(fileName_.size() + 1);
352  copy(fileName_.begin(), fileName_.end(), buf2.begin());
353  buf2[buf1.size() - 1] = 0;
354 
355  string dirnam(dirname(&buf1[0]));
356  string filenam(basename(&buf2[0]));
357 
358  string dest = inprocessDir_ + "/" + filenam;
359 
360  if (verbosity_)
361  cout << "[WatcherSource " << now() << "]"
362  << " Moving file " << fileName_ << " to " << dest << "\n";
363 
364  stringstream c;
365  c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest << "/.\"";
366 
367  if (0 != rename(fileName_.c_str(), dest.c_str())) {
368  //if(0!=system(c.str().c_str())){
369  throw cms::Exception("WatcherSource")
370  << "Failed to move file '" << fileName_ << "' "
371  << "to processing directory " << inprocessDir_ << ": " << strerror(errno);
372  }
373 
374  fileName_ = dest;
375 
376  cout << "[WatcherSource " << now() << "]"
377  << " Opening file " << fileName_ << "\n"
378  << flush;
379  streamerInputFile_ = unique_ptr<edm::StreamerInputFile>(new edm::StreamerInputFile(fileName_));
380 
381  ofstream f(".watcherfile");
382  f << fileName_;
383  } else {
384  cout << "[WatcherSource " << now() << "]"
385  << " Failed to open file " << fileName_ << endl;
386  }
387  } //loop on file queue to find one file which opening succeeded
388  }
389  return streamerInputFile_.get();
390 }
size
Write out results.
float dt
Definition: AMPTWrapper.h:126
Definition: rename.py:1
def copy(args, dbName)
std::deque< std::string > filesInQueue_
double f[11][100]
static std::string now()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 209 of file WatcherStreamFileReader.cc.

References closeFile(), edm::StreamerInputFile::currentRecord(), end_, getInputFile(), analyzePatCleaning_cfg::inputFile, and edm::StreamerInputFile::next().

209  {
210  if (end_) {
211  closeFile();
212  return nullptr;
213  }
214 
216 
217  //go to next input file, till no new event is found
218  while ((inputFile = getInputFile()) != nullptr && inputFile->next() == 0) {
219  closeFile();
220  }
221 
222  return inputFile == nullptr ? nullptr : inputFile->currentRecord();
223 }
edm::StreamerInputFile * getInputFile()
EventMsgView const * currentRecord() const
const bool WatcherStreamFileReader::newHeader ( )

Member Data Documentation

std::string WatcherStreamFileReader::corruptedDir_
private

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 58 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::curDir_
private

Definition at line 78 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

bool WatcherStreamFileReader::end_
private

Definition at line 72 of file WatcherStreamFileReader.h.

Referenced by closeFile(), getInputFile(), and getNextEvent().

std::string WatcherStreamFileReader::fileListCmd_
private

Definition at line 76 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::fileName_
private

Definition at line 64 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

std::vector<std::string> WatcherStreamFileReader::filePatterns_
private

Streamer file name pattern list

Definition at line 46 of file WatcherStreamFileReader.h.

Referenced by WatcherStreamFileReader().

std::deque<std::string> WatcherStreamFileReader::filesInQueue_
private

Definition at line 70 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::inprocessDir_
private

Directory where file are moved during processing

Definition at line 50 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::inputDir_
private

Directory to look for streamer files

Definition at line 42 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::processedDir_
private

Directory where file must be moved once processed

Definition at line 54 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and WatcherStreamFileReader().

std::unique_ptr<edm::StreamerInputFile> WatcherStreamFileReader::streamerInputFile_
private

Cached input file stream

Definition at line 62 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

int WatcherStreamFileReader::timeOut_
private

Definition at line 68 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::tokenFile_
private

Definition at line 66 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

int WatcherStreamFileReader::verbosity_
private

Definition at line 74 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().