CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

Public Member Functions

void closeFile ()
 
const edm::streamer::InitMsgViewgetHeader ()
 
edm::streamer::StreamerInputFilegetInputFile ()
 
const edm::streamer::EventMsgViewgetNextEvent ()
 
const bool newHeader ()
 
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 
 ~WatcherStreamFileReader ()
 

Private Member Functions

void moveJustReadFile ()
 

Private Attributes

std::string corruptedDir_
 
bool end_
 
std::string fileName_
 
std::vector< std::string > filePatterns_
 
std::deque< std::string > filesInQueue_
 
std::string inprocessDir_
 
std::string inputDir_
 
std::string processedDir_
 
std::unique_ptr< edm::streamer::StreamerInputFilestreamerInputFile_
 
int timeOut_
 
std::string tokenFile_
 
int verbosity_
 

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.

Constructor & Destructor Documentation

◆ WatcherStreamFileReader()

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 115 of file WatcherStreamFileReader.cc.

References visDQMUpload::buf, corruptedDir_, DeadROC_duringRun::dir, heppy_check::dirs, Exception, f, mps_fire::i, inprocessDir_, eostools::mkdir(), processedDir_, edm_modernize_messagelogger::stat, and tokenFile_.

116  : inputDir_(pset.getParameter<std::string>("inputDir")),
117  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
118  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
119  processedDir_(pset.getParameter<std::string>("processedDir")),
120  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
121  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
122  timeOut_(pset.getParameter<int>("timeOutInSec")),
123  end_(false),
124  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
125  struct stat buf;
126  if (stat(tokenFile_.c_str(), &buf)) {
127  FILE* f = fopen(tokenFile_.c_str(), "w");
128  if (f) {
129  fclose(f);
130  } else {
131  throw cms::Exception("WatcherSource") << "Failed to create token file.";
132  }
133  }
134  vector<string> dirs;
135  dirs.push_back(inprocessDir_);
136  dirs.push_back(processedDir_);
137  dirs.push_back(corruptedDir_);
138 
139  for (unsigned i = 0; i < dirs.size(); ++i) {
140  const string& dir = dirs[i];
141  struct stat fileStat;
142  if (0 == stat(dir.c_str(), &fileStat)) {
143  if (!S_ISDIR(fileStat.st_mode)) {
144  throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
145  << " as expected.";
146  }
147  } else { //directory does not exists, let's try to create it
148  if (0 != mkdir(dir.c_str(), 0755)) {
149  throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
150  }
151  }
152  }
153 }
std::vector< std::string > filePatterns_
double f[11][100]
def mkdir(path)
Definition: eostools.py:251

◆ ~WatcherStreamFileReader()

WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 155 of file WatcherStreamFileReader.cc.

155 {}

Member Function Documentation

◆ closeFile()

void WatcherStreamFileReader::closeFile ( )

Definition at line 392 of file WatcherStreamFileReader.cc.

392 {}

◆ getHeader()

const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 159 of file WatcherStreamFileReader.cc.

References Exception, getInputFile(), RecoTauValidation_cfi::header, and makeListRunsInFiles::inputFile.

159  {
161 
162  //TODO: shall better send an exception...
163  if (inputFile == nullptr) {
164  throw cms::Exception("WatcherSource") << "No input file found.";
165  }
166 
167  const InitMsgView* header = inputFile->startMessage();
168 
169  if (header->code() != Header::INIT) //INIT Msg
170  throw cms::Exception("readHeader", "WatcherStreamFileReader")
171  << "received wrong message type: expected INIT, got " << header->code() << "\n";
172 
173  return header;
174 }
edm::streamer::StreamerInputFile * getInputFile()

◆ getInputFile()

StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 191 of file WatcherStreamFileReader.cc.

References visDQMUpload::buf, DummyCfis::c, mps_setup::cmd, filterCSVwithJSON::copy, corruptedDir_, gather_cfg::cout, mps_fire::dest, BTVHLTOfflineSource_cfi::dirname, dt, end_, Exception, f, ztee::fd, MillePedeFileConverter_cfg::fileName, fileName_, filePatterns_, filesInQueue_, free(), mps_fire::i, inprocessDir_, inputDir_, createfilelist::int, dqmiodumpmetadata::n, now(), alignCSCRings::s, findQualityFiles::size, edm_modernize_messagelogger::stat, streamerInputFile_, submitPVValidationJobs::t, hcalRecHitTable_cff::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

191  {
192  char* lineptr = nullptr;
193  size_t n = 0;
194  static stringstream cmd;
195  static bool cmdSet = false;
196  static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
197 
198  if (!cmdSet) {
199  cmd.str("");
200  // cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
201  //by default ls will sort the file alphabetically which will results
202  //in ordering the files in increasing LB number, which is the desired
203  //order.
204  // cmd << "/bin/ls " << inputDir_ << " | egrep '(";
205  cmd << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
206  //TODO: validate patternDir (see ;, &&, ||) and escape special character
207  if (filePatterns_.empty())
208  return nullptr;
209  if (getcwd(curDir, sizeof(curDir)) == nullptr) {
210  throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
211  }
212 
213  for (unsigned i = 0; i < filePatterns_.size(); ++i) {
214  if (i > 0)
215  cmd << "|";
216  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
217  // cmd << curDir << "/";
218  // }
219  cmd << filePatterns_[i];
220  }
221  cmd << ")' | sort";
222 
223  cout << "[WatcherSource " << now() << "]"
224  << " Command to retrieve input files: " << cmd.str() << "\n";
225  cmdSet = true;
226  }
227 
228  struct stat buf;
229 
230  if (stat(tokenFile_.c_str(), &buf) != 0) {
231  end_ = true;
232  }
233 
234  bool waiting = false;
235  static bool firstWait = true;
236  timeval waitStart;
237  //if no cached input file, look for new files until one is found:
238  if (!end_ && streamerInputFile_.get() == nullptr) {
239  fileName_.assign("");
240 
241  //check if we have file in the queue, if not look for new files:
242  while (filesInQueue_.empty()) {
243  if (stat(tokenFile_.c_str(), &buf) != 0) {
244  end_ = true;
245  break;
246  }
247  FILE* s = popen(cmd.str().c_str(), "r");
248  if (s == nullptr) {
249  throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
250  }
251 
252  ssize_t len;
253  while (!feof(s)) {
254  if ((len = getline(&lineptr, &n, s)) > 0) {
255  //remove end-of-line character:
256  lineptr[len - 1] = 0;
257  string fileName;
258  if (lineptr[0] != '/') {
259  if (!inputDir_.empty() && inputDir_[0] != '/') { //relative path
260  fileName.assign(curDir);
261  fileName.append("/");
262  fileName.append(inputDir_);
263  } else {
264  fileName.assign(inputDir_);
265  }
266  fileName.append("/");
267  }
268  fileName.append(lineptr);
269  filesInQueue_.push_back(fileName);
270  if (verbosity_)
271  cout << "[WatcherSource " << now() << "]"
272  << " File to process: '" << fileName << "'\n";
273  }
274  }
275  while (!feof(s))
276  fgetc(s);
277  pclose(s);
278  if (filesInQueue_.empty()) {
279  if (!waiting) {
280  cout << "[WatcherSource " << now() << "]"
281  << " No file found. Waiting for new file...\n";
282  cout << flush;
283  waiting = true;
284  gettimeofday(&waitStart, nullptr);
285  } else if (!firstWait) {
286  timeval t;
287  gettimeofday(&t, nullptr);
288  float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
289  if ((timeOut_ >= 0) && (dt > timeOut_)) {
290  cout << "[WatcherSource " << now() << "]"
291  << " Having waited for new file for " << (int)dt << " sec. "
292  << "Timeout exceeded. Exits.\n";
293  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
294  end_ = true;
295  break;
296  }
297  }
298  }
299  sleep(1);
300  } //end of file queue update
301  firstWait = false;
302  free(lineptr);
303  lineptr = nullptr;
304 
305  while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
306  fileName_ = filesInQueue_.front();
307  filesInQueue_.pop_front();
308  int fd = open(fileName_.c_str(), 0);
309  if (fd != 0) {
310  struct stat buf;
311  off_t size = -1;
312  //check that file transfer is finished, by monitoring its size:
313  time_t t = time(nullptr);
314  for (;;) {
315  fstat(fd, &buf);
316  if (verbosity_)
317  cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
318  if (buf.st_size == size)
319  break;
320  else
321  size = buf.st_size;
322  if (difftime(t, buf.st_mtime) > 60)
323  break; //file older then 1 min=> tansfer must be finished
324  sleep(1);
325  }
326 
327  if (fd != 0 && buf.st_size == 0) { //file is empty. streamer reader
328  // does not like empty file=> skip it
329  stringstream c;
330  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
331  if (verbosity_)
332  cout << "[WatcherSource " << now() << "]"
333  << " Executing " << c.str() << "\n";
334  int i = system(c.str().c_str());
335  if (i != 0) {
336  //throw cms::Exception("WatcherSource")
337  cout << "[WatcherSource " << now() << "] "
338  << "Failed to move empty file '" << fileName_ << "'"
339  << " to corrupted directory '" << corruptedDir_ << "'\n";
340  }
341  continue;
342  }
343 
344  close(fd);
345 
346  vector<char> buf1(fileName_.size() + 1);
347  copy(fileName_.begin(), fileName_.end(), buf1.begin());
348  buf1[buf1.size() - 1] = 0;
349 
350  vector<char> buf2(fileName_.size() + 1);
351  copy(fileName_.begin(), fileName_.end(), buf2.begin());
352  buf2[buf2.size() - 1] = 0;
353 
354  string dirnam(dirname(&buf1[0]));
355  string filenam(basename(&buf2[0]));
356 
357  string dest = inprocessDir_ + "/" + filenam;
358 
359  if (verbosity_)
360  cout << "[WatcherSource " << now() << "]"
361  << " Moving file " << fileName_ << " to " << dest << "\n";
362 
363  // stringstream c;
364  //c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest
365  // << "/.\"";
366 
367  if (0 != rename(fileName_.c_str(), dest.c_str())) {
368  //if(0!=system(c.str().c_str())){
369  throw cms::Exception("WatcherSource") << "Failed to move file '" << fileName_ << "' "
370  << "to processing directory " << inprocessDir_ << ": "
371  << strerror(errno) << " (Error no " << errno << ")";
372  }
373 
374  fileName_ = dest;
375 
376  cout << "[WatcherSource " << now() << "]"
377  << " Opening file " << fileName_ << "\n"
378  << flush;
379  streamerInputFile_ = std::make_unique<StreamerInputFile>(fileName_);
380 
381  ofstream f(".watcherfile");
382  f << fileName_;
383  } else {
384  cout << "[WatcherSource " << now() << "]"
385  << " Failed to open file " << fileName_ << endl;
386  }
387  } //loop on file queue to find one file which opening succeeded
388  }
389  return streamerInputFile_.get();
390 }
std::vector< std::string > filePatterns_
size
Write out results.
float dt
Definition: AMPTWrapper.h:136
Definition: rename.py:1
std::deque< std::string > filesInQueue_
std::unique_ptr< edm::streamer::StreamerInputFile > streamerInputFile_
void free(void *ptr) noexcept
double f[11][100]
static std::string now()
list cmd
Definition: mps_setup.py:244
fd
Definition: ztee.py:136

◆ getNextEvent()

const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 176 of file WatcherStreamFileReader.cc.

References end_, getInputFile(), makeListRunsInFiles::inputFile, edm::shared_memory::channel_names::kStop, and moveJustReadFile().

176  {
177  if (end_) {
179  return nullptr;
180  }
181 
183  if ((inputFile = getInputFile()) != nullptr and inputFile->next() == StreamerInputFile::Next::kStop) {
185  return nullptr;
186  }
187 
188  return inputFile == nullptr ? nullptr : inputFile->currentRecord();
189 }
constexpr char const *const kStop
Definition: channel_names.h:34
edm::streamer::StreamerInputFile * getInputFile()

◆ moveJustReadFile()

void WatcherStreamFileReader::moveJustReadFile ( )
private

Definition at line 394 of file WatcherStreamFileReader.cc.

References visDQMUpload::buf, mps_setup::cmd, filterCSVwithJSON::copy, gather_cfg::cout, mps_fire::dest, end_, Exception, fileName_, mps_fire::i, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent().

394  {
395  if (streamerInputFile_.get() == nullptr)
396  return;
397  //delete the streamer input file:
398  streamerInputFile_.reset();
399  stringstream cmd;
400  //TODO: validation of processDir
401  //cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
402  //if(verbosity_) cout << "[WatcherSource " << now() << "]"
403  //<< " Executing (in closeFile())" << cmd.str() << "\n";
404  //int i = system(cmd.str().c_str());
405  //cout << "move command done at " << now() << "\n";
406  vector<char> buf(fileName_.size() + 1);
407  copy(fileName_.begin(), fileName_.end(), buf.begin());
408  buf[buf.size() - 1] = 0;
409  string dest = processedDir_ + "/" + basename(&buf[0]);
410  if (verbosity_)
411  cout << "[WatcherSource " << now() << "]"
412  << " Moving " << fileName_ << " to " << dest << "... ";
413  int i = rename(fileName_.c_str(), dest.c_str());
414  if (i != 0) {
415  throw cms::Exception("WatcherSource")
416  << "Failed to move processed file '" << fileName_ << "'"
417  << " to processed directory '" << processedDir_ << ": " << strerror(errno) << " (Error no " << errno << ")";
418 
419  //Stop further processing to prevent endless loop:
420  end_ = true;
421  }
422  if (verbosity_)
423  cout << "Done at " << now() << "\n";
424 
425  // cout << flush;
426 }
Definition: rename.py:1
std::unique_ptr< edm::streamer::StreamerInputFile > streamerInputFile_
static std::string now()
list cmd
Definition: mps_setup.py:244

◆ newHeader()

const bool WatcherStreamFileReader::newHeader ( )

Definition at line 157 of file WatcherStreamFileReader.cc.

References getInputFile().

157 { return getInputFile() != nullptr; }
edm::streamer::StreamerInputFile * getInputFile()

Member Data Documentation

◆ corruptedDir_

std::string WatcherStreamFileReader::corruptedDir_
private

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 59 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ end_

bool WatcherStreamFileReader::end_
private

Definition at line 73 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), getNextEvent(), and moveJustReadFile().

◆ fileName_

std::string WatcherStreamFileReader::fileName_
private

Definition at line 65 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and moveJustReadFile().

◆ filePatterns_

std::vector<std::string> WatcherStreamFileReader::filePatterns_
private

Streamer file name pattern list

Definition at line 47 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ filesInQueue_

std::deque<std::string> WatcherStreamFileReader::filesInQueue_
private

Definition at line 71 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ inprocessDir_

std::string WatcherStreamFileReader::inprocessDir_
private

Directory where file are moved during processing

Definition at line 51 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ inputDir_

std::string WatcherStreamFileReader::inputDir_
private

Directory to look for streamer files

Definition at line 43 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ processedDir_

std::string WatcherStreamFileReader::processedDir_
private

Directory where file must be moved once processed

Definition at line 55 of file WatcherStreamFileReader.h.

Referenced by moveJustReadFile(), and WatcherStreamFileReader().

◆ streamerInputFile_

std::unique_ptr<edm::streamer::StreamerInputFile> WatcherStreamFileReader::streamerInputFile_
private

Cached input file stream

Definition at line 63 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and moveJustReadFile().

◆ timeOut_

int WatcherStreamFileReader::timeOut_
private

Definition at line 69 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ tokenFile_

std::string WatcherStreamFileReader::tokenFile_
private

Definition at line 67 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ verbosity_

int WatcherStreamFileReader::verbosity_
private

Definition at line 75 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and moveJustReadFile().