CMS 3D CMS Logo

List of all members | Public Member Functions | Private Attributes
WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

Public Member Functions

void closeFile ()
 
const InitMsgViewgetHeader ()
 
edm::StreamerInputFilegetInputFile ()
 
const EventMsgViewgetNextEvent ()
 
const bool newHeader ()
 
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 
 ~WatcherStreamFileReader ()
 

Private Attributes

std::string corruptedDir_
 
std::string curDir_
 
bool end_
 
std::string fileListCmd_
 
std::string fileName_
 
std::vector< std::string > filePatterns_
 
std::deque< std::string > filesInQueue_
 
std::string inprocessDir_
 
std::string inputDir_
 
std::string processedDir_
 
std::unique_ptr< edm::StreamerInputFilestreamerInputFile_
 
int timeOut_
 
std::string tokenFile_
 
int verbosity_
 

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.

Constructor & Destructor Documentation

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 124 of file WatcherStreamFileReader.cc.

References corruptedDir_, gather_cfg::cout, curDir_, dir, heppy_check::dirs, Exception, f, fileListCmd_, filePatterns_, mps_fire::i, inprocessDir_, inputDir_, eostools::mkdir(), now(), processedDir_, trackingPlots::stat, and tokenFile_.

124  :
125  inputDir_(pset.getParameter<std::string>("inputDir")),
126  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
127  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
128  processedDir_(pset.getParameter<std::string>("processedDir")),
129  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
130  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile",
131  "watcherSourceToken")),
132  timeOut_(pset.getParameter<int>("timeOutInSec")),
133  end_(false),
134  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)){
135  struct stat buf;
136  if(stat(tokenFile_.c_str(), &buf)){
137  FILE* f = fopen(tokenFile_.c_str(), "w");
138  if(f){
139  fclose(f);
140  } else{
141  throw cms::Exception("WatcherSource") << "Failed to create token file.";
142  }
143  }
144  vector<string> dirs;
145  dirs.push_back(inprocessDir_);
146  dirs.push_back(processedDir_);
147  dirs.push_back(corruptedDir_);
148 
149  for(unsigned i = 0; i < dirs.size(); ++i){
150  const string& dir = dirs[i];
151  struct stat fileStat;
152  if(0==stat(dir.c_str(), &fileStat)){
153  if(!S_ISDIR(fileStat.st_mode)){
154  throw cms::Exception("[WatcherSource]")
155  << "File " << dir << " exists but is not a directory "
156  << " as expected.";
157  }
158  } else {//directory does not exists, let's try to create it
159  if(0!=mkdir(dir.c_str(), 0755)){
160  throw cms::Exception("[WatcherSource]")
161  << "Failed to create directory " << dir
162  << " for writing data.";
163  }
164  }
165  }
166 
167  std::stringstream fileListCmdBuf;
168  fileListCmdBuf.str("");
169  // fileListCmdBuf << "/bin/ls -rt " << inputDir_ << " | egrep '(";
170  //by default ls will sort the file alphabetically which will results
171  //in ordering the files in increasing LB number, which is the desired
172  //order.
173  // fileListCmdBuf << "/bin/ls " << inputDir_ << " | egrep '(";
174  fileListCmdBuf << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
175  //TODO: validate patternDir (see ;, &&, ||) and escape special character
176  if(filePatterns_.empty()) throw cms::Exception("WacherSource", "filePatterns parameter is empty");
177  char curDir[PATH_MAX>0?PATH_MAX:4096];
178  if(getcwd(curDir, sizeof(curDir))==nullptr){
179  throw cms::Exception("WatcherSource")
180  << "Failed to retreived working directory path: "
181  << strerror(errno);
182  }
183  curDir_ = curDir;
184 
185  for(unsigned i = 0 ; i < filePatterns_.size(); ++i){
186  if(i>0) fileListCmdBuf << "|";
187  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
188  // fileListCmdBuf << curDir << "/";
189  // }
190  fileListCmdBuf << filePatterns_[i];
191  }
192  fileListCmdBuf << ")' | sort";
193 
194  fileListCmd_ = fileListCmdBuf.str();
195 
196  cout << "[WatcherSource " << now() << "]"
197  << " Command to retrieve input files: "
198  << fileListCmd_ << "\n";
199 
200 }
std::vector< std::string > filePatterns_
double f[11][100]
static std::string now()
def mkdir(path)
Definition: eostools.py:251
dbl *** dir
Definition: mlp_gen.cc:35
WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 202 of file WatcherStreamFileReader.cc.

202  {
203 }

Member Function Documentation

void WatcherStreamFileReader::closeFile ( )

Definition at line 410 of file WatcherStreamFileReader.cc.

References mps_setup::cmd, gather_cfg::cout, end_, Exception, fileName_, mps_fire::i, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent(), and Vispa.Main.Application.Application::tabCloseRequest().

410  {
411  if(streamerInputFile_.get()==nullptr) return;
412  //delete the streamer input file:
413  streamerInputFile_.reset();
414  stringstream cmd;
415  //TODO: validation of processDir
416  cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
417  if(verbosity_) cout << "[WatcherSource " << now() << "]"
418  << " Excuting " << cmd.str() << "\n";
419  int i = system(cmd.str().c_str());
420  if(i!=0){
421  throw cms::Exception("WatcherSource")
422  << "Failed to move processed file '" << fileName_ << "'"
423  << " to processed directory '" << processedDir_ << "'\n";
424  //Stop further processing to prevent endless loop:
425  end_ = true;
426  }
427  cout << flush;
428 }
static std::string now()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
list cmd
Definition: mps_setup.py:237
const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 210 of file WatcherStreamFileReader.cc.

References InitMsgView::code(), Exception, getInputFile(), RecoTauValidation_cfi::header, Header::INIT, analyzePatCleaning_cfg::inputFile, and edm::StreamerInputFile::startMessage().

210  {
211 
213 
214  //TODO: shall better send an exception...
215  if(inputFile==nullptr){
216  throw cms::Exception("WatcherSource") << "No input file found.";
217  }
218 
219  const InitMsgView* header = inputFile->startMessage();
220 
221  if(header->code() != Header::INIT) //INIT Msg
222  throw cms::Exception("readHeader","WatcherStreamFileReader")
223  << "received wrong message type: expected INIT, got "
224  << header->code() << "\n";
225 
226  return header;
227 }
InitMsgView const * startMessage() const
edm::StreamerInputFile * getInputFile()
uint32 code() const
Definition: InitMessage.h:72
edm::StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 243 of file WatcherStreamFileReader.cc.

References estimatePileup::basename, EnergyCorrector::c, popcon2dropbox::copy(), corruptedDir_, gather_cfg::cout, curDir_, mps_fire::dest, compare_using_db::dirname, dt, end_, Exception, f, fileListCmd_, MillePedeFileConverter_cfg::fileName, fileName_, filesInQueue_, mps_fire::i, inprocessDir_, inputDir_, createfilelist::int, gen::n, now(), alignCSCRings::s, findQualityFiles::size, trackingPlots::stat, streamerInputFile_, lumiQTWidget::t, ntuplemaker::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

243  {
244  char* lineptr = nullptr;
245  size_t n = 0;
246 
247  struct stat buf;
248 
249  if(stat(tokenFile_.c_str(), &buf)!=0){
250  end_ = true;
251  }
252 
253  bool waiting = false;
254  static bool firstWait = true;
255  timeval waitStart;
256  //if no cached input file, look for new files until one is found:
257  if(!end_ && streamerInputFile_.get()==nullptr){
258  fileName_.assign("");
259 
260  //check if we have file in the queue, if not look for new files:
261  while(filesInQueue_.empty()){
262  if(stat(tokenFile_.c_str(), &buf)!=0){
263  end_ = true;
264  break;
265  }
266  FILE* s = popen(fileListCmd_.c_str(), "r");
267  if(s==nullptr){
268  throw cms::Exception("WatcherSource")
269  << "Failed to retrieve list of input file: " << strerror(errno);
270  }
271 
272  ssize_t len;
273  while(!feof(s)){
274  if((len=getline(&lineptr, &n, s))>0){
275  //remove end-of-line character:
276  lineptr[len-1] = 0;
277  string fileName;
278  if(lineptr[0] != '/'){
279  if(!inputDir_.empty() && inputDir_[0] != '/'){//relative path
280  fileName.assign(curDir_);
281  fileName.append("/");
282  fileName.append(inputDir_);
283  } else{
284  fileName.assign(inputDir_);
285  }
286  fileName.append("/");
287  }
288  fileName.append(lineptr);
289  filesInQueue_.push_back(fileName);
290  if(verbosity_) cout << "[WatcherSource " << now() << "]"
291  << " File to process: '"
292  << fileName << "'\n";
293  }
294  }
295  while(!feof(s)) fgetc(s);
296  pclose(s);
297  if(filesInQueue_.empty()){
298  if(!waiting){
299  cout << "[WatcherSource " << now() << "]"
300  << " No file found. Waiting for new file...\n";
301  cout << flush;
302  waiting = true;
303  gettimeofday(&waitStart, nullptr);
304  } else if(!firstWait){
305  timeval t;
306  gettimeofday(&t, nullptr);
307  float dt = (t.tv_sec-waitStart.tv_sec) * 1.
308  + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
309  if((timeOut_ >= 0) && (dt > timeOut_)){
310  cout << "[WatcherSource " << now() << "]"
311  << " Having waited for new file for " << (int)dt << " sec. "
312  << "Timeout exceeded. Exits.\n";
313  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
314  end_ = true;
315  break;
316  }
317  }
318  }
319  sleep(1);
320  } //end of file queue update
321  firstWait = false;
322  free(lineptr); lineptr=nullptr;
323 
324  while(streamerInputFile_.get()==nullptr && !filesInQueue_.empty()){
325 
326  fileName_ = filesInQueue_.front();
327  filesInQueue_.pop_front();
328  int fd = open(fileName_.c_str(), 0);
329  if(fd!=0){
330  struct stat buf;
331  off_t size = -1;
332  //check that file transfer is finished, by monitoring its size:
333  time_t t = time(nullptr);
334  for(;;){
335  fstat(fd, &buf);
336  if(verbosity_) cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
337  if(buf.st_size==size) break; else size = buf.st_size;
338  if(difftime(t,buf.st_mtime)>60) break; //file older then 1 min=> tansfer must be finished
339  sleep(1);
340  }
341 
342  if(fd!=0 && buf.st_size == 0){//file is empty. streamer reader
343  // does not like empty file=> skip it
344  stringstream c;
345  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_
346  << "/.\"";
347  if(verbosity_) cout << "[WatcherSource " << now() << "]"
348  << " Excuting "
349  << c.str() << "\n";
350  int i = system(c.str().c_str());
351  if(i!=0){
352  //throw cms::Exception("WatcherSource")
353  cout << "[WatcherSource " << now() << "] "
354  << "Failed to move empty file '" << fileName_ << "'"
355  << " to corrupted directory '" << corruptedDir_ << "'\n";
356  }
357  continue;
358  }
359 
360  close(fd);
361 
362  vector<char> buf1(fileName_.size()+1);
363  copy(fileName_.begin(), fileName_.end(), buf1.begin());
364  buf1[buf1.size()-1] = 0;
365 
366  vector<char> buf2(fileName_.size()+1);
367  copy(fileName_.begin(), fileName_.end(), buf2.begin());
368  buf2[buf1.size()-1] = 0;
369 
370  string dirnam(dirname(&buf1[0]));
371  string filenam(basename(&buf2[0]));
372 
373  string dest = inprocessDir_ + "/" + filenam;
374 
375  if(verbosity_) cout << "[WatcherSource " << now() << "]"
376  << " Moving file "
377  << fileName_ << " to " << dest << "\n";
378 
379  stringstream c;
380  c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest
381  << "/.\"";
382 
383 
384  if(0!=rename(fileName_.c_str(), dest.c_str())){
385  //if(0!=system(c.str().c_str())){
386  throw cms::Exception("WatcherSource")
387  << "Failed to move file '" << fileName_ << "' "
388  << "to processing directory " << inprocessDir_
389  << ": " << strerror(errno);
390  }
391 
392  fileName_ = dest;
393 
394  cout << "[WatcherSource " << now() << "]"
395  << " Opening file " << fileName_ << "\n" << flush;
397  = unique_ptr<edm::StreamerInputFile>(new edm::StreamerInputFile(fileName_));
398 
399  ofstream f(".watcherfile");
400  f << fileName_;
401  } else{
402  cout << "[WatcherSource " << now() << "]"
403  << " Failed to open file " << fileName_ << endl;
404  }
405  } //loop on file queue to find one file which opening succeeded
406  }
407  return streamerInputFile_.get();
408 }
size
Write out results.
float dt
Definition: AMPTWrapper.h:126
Definition: rename.py:1
def copy(args, dbName)
std::deque< std::string > filesInQueue_
double f[11][100]
static std::string now()
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 229 of file WatcherStreamFileReader.cc.

References closeFile(), edm::StreamerInputFile::currentRecord(), end_, getInputFile(), analyzePatCleaning_cfg::inputFile, and edm::StreamerInputFile::next().

229  {
230  if(end_){ closeFile(); return nullptr;}
231 
233 
234  //go to next input file, till no new event is found
235  while((inputFile=getInputFile())!=nullptr
236  && inputFile->next()==0){
237  closeFile();
238  }
239 
240  return inputFile==nullptr?nullptr:inputFile->currentRecord();
241 }
edm::StreamerInputFile * getInputFile()
EventMsgView const * currentRecord() const
const bool WatcherStreamFileReader::newHeader ( )

Member Data Documentation

std::string WatcherStreamFileReader::corruptedDir_
private

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 59 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::curDir_
private

Definition at line 79 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

bool WatcherStreamFileReader::end_
private

Definition at line 73 of file WatcherStreamFileReader.h.

Referenced by closeFile(), getInputFile(), and getNextEvent().

std::string WatcherStreamFileReader::fileListCmd_
private

Definition at line 77 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::fileName_
private

Definition at line 65 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

std::vector<std::string> WatcherStreamFileReader::filePatterns_
private

Streamer file name pattern list

Definition at line 46 of file WatcherStreamFileReader.h.

Referenced by WatcherStreamFileReader().

std::deque<std::string> WatcherStreamFileReader::filesInQueue_
private

Definition at line 71 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::inprocessDir_
private

Directory where file are moved during processing

Definition at line 50 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::inputDir_
private

Directory to look for streamer files

Definition at line 42 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::processedDir_
private

Directory where file must be moved once processed

Definition at line 55 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and WatcherStreamFileReader().

std::unique_ptr<edm::StreamerInputFile> WatcherStreamFileReader::streamerInputFile_
private

Cached input file stream

Definition at line 63 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

int WatcherStreamFileReader::timeOut_
private

Definition at line 69 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::tokenFile_
private

Definition at line 67 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

int WatcherStreamFileReader::verbosity_
private

Definition at line 75 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().