CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Attributes | Static Private Attributes
WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

Public Member Functions

void closeFile ()
 
const InitMsgViewgetHeader ()
 
edm::StreamerInputFilegetInputFile ()
 
const EventMsgViewgetNextEvent ()
 
const bool newHeader ()
 
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 
 ~WatcherStreamFileReader ()
 

Private Attributes

std::string corruptedDir_
 
bool end_
 
std::vector< std::string > filePatterns_
 
std::deque< std::string > filesInQueue_
 
std::string inprocessDir_
 
std::string inputDir_
 
std::string processedDir_
 
std::auto_ptr
< edm::StreamerInputFile
streamerInputFile_
 
int timeOut_
 
std::string tokenFile_
 
int verbosity_
 

Static Private Attributes

static std::string fileName_
 

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.

Constructor & Destructor Documentation

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 124 of file WatcherStreamFileReader.cc.

References corruptedDir_, dir, edm::hlt::Exception, f, i, inprocessDir_, processedDir_, and tokenFile_.

124  :
125  inputDir_(pset.getParameter<std::string>("inputDir")),
126  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
127  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
128  processedDir_(pset.getParameter<std::string>("processedDir")),
129  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
130  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile",
131  "watcherSourceToken")),
132  timeOut_(pset.getParameter<int>("timeOutInSec")),
133  end_(false),
134  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)){
135  struct stat buf;
136  if(stat(tokenFile_.c_str(), &buf)){
137  FILE* f = fopen(tokenFile_.c_str(), "w");
138  if(f){
139  fclose(f);
140  } else{
141  throw cms::Exception("WatcherSource") << "Failed to create token file.";
142  }
143  }
144  vector<string> dirs;
145  dirs.push_back(inprocessDir_);
146  dirs.push_back(processedDir_);
147  dirs.push_back(corruptedDir_);
148 
149  for(unsigned i = 0; i < dirs.size(); ++i){
150  const string& dir = dirs[i];
151  struct stat fileStat;
152  if(0==stat(dir.c_str(), &fileStat)){
153  if(!S_ISDIR(fileStat.st_mode)){
154  throw cms::Exception("[WatcherSource]")
155  << "File " << dir << " exists but is not a directory "
156  << " as expected.";
157  }
158  } else {//directory does not exists, let's try to create it
159  if(0!=mkdir(dir.c_str(), 0755)){
160  throw cms::Exception("[WatcherSource]")
161  << "Failed to create directory " << dir
162  << " for writing data.";
163  }
164  }
165  }
166 }
std::vector< std::string > filePatterns_
int i
Definition: DBlmapReader.cc:9
double f[11][100]
dbl *** dir
Definition: mlp_gen.cc:35
WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 168 of file WatcherStreamFileReader.cc.

168  {
169 }

Member Function Documentation

void WatcherStreamFileReader::closeFile ( )

Definition at line 397 of file WatcherStreamFileReader.cc.

References asciidump::cmd, gather_cfg::cout, end_, edm::hlt::Exception, fileName_, i, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent(), and Vispa.Main.Application.Application::tabCloseRequest().

397  {
398  if(streamerInputFile_.get()==0) return;
399  //delete the streamer input file:
400  streamerInputFile_.reset();
401  stringstream cmd;
402  //TODO: validation of processDir
403  cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
404  if(verbosity_) cout << "[WatcherSource " << now() << "]"
405  << " Excuting " << cmd.str() << "\n";
406  int i = system(cmd.str().c_str());
407  if(i!=0){
408  throw cms::Exception("WatcherSource")
409  << "Failed to move processed file '" << fileName_ << "'"
410  << " to processed directory '" << processedDir_ << "'\n";
411  //Stop further processing to prevent endless loop:
412  end_ = true;
413  }
414  cout << flush;
415 }
int i
Definition: DBlmapReader.cc:9
std::auto_ptr< edm::StreamerInputFile > streamerInputFile_
string cmd
Definition: asciidump.py:19
static std::string now()
tuple cout
Definition: gather_cfg.py:121
const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 176 of file WatcherStreamFileReader.cc.

References InitMsgView::code(), edm::hlt::Exception, getInputFile(), errorMatrix2Lands::header, Header::INIT, analyzePatCOC_cfg::inputFile, and edm::StreamerInputFile::startMessage().

176  {
177 
179 
180  //TODO: shall better send an exception...
181  if(inputFile==0){
182  throw cms::Exception("WatcherSource") << "No input file found.";
183  }
184 
185  const InitMsgView* header = inputFile->startMessage();
186 
187  if(header->code() != Header::INIT) //INIT Msg
188  throw cms::Exception("readHeader","WatcherStreamFileReader")
189  << "received wrong message type: expected INIT, got "
190  << header->code() << "\n";
191 
192  return header;
193 }
InitMsgView const * startMessage() const
edm::StreamerInputFile * getInputFile()
uint32 code() const
Definition: InitMessage.h:66
edm::StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 209 of file WatcherStreamFileReader.cc.

References trackerHits::c, asciidump::cmd, filterCSVwithJSON::copy, corruptedDir_, gather_cfg::cout, alignCSCRings::dest, combineCards::dirname, dt, end_, edm::hlt::Exception, f, convertXMLtoSQLite_cfg::fileName, fileName_, filePatterns_, filesInQueue_, i, inprocessDir_, inputDir_, n, now(), alignCSCRings::s, findQualityFiles::size, stor::utils::sleep(), streamerInputFile_, lumiQTWidget::t, cond::rpcobgas::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

209  {
210  char* lineptr = 0;
211  size_t n = 0;
212  static stringstream cmd;
213  static bool cmdSet = false;
214  static char curDir[PATH_MAX>0?PATH_MAX:4096];
215 
216  if(!cmdSet){
217  cmd.str("");
218  cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
219  //TODO: validate patternDir (see ;, &&, ||) and escape special character
220  if(filePatterns_.size()==0) return 0;
221  if(getcwd(curDir, sizeof(curDir))==0){
222  throw cms::Exception("WatcherSource")
223  << "Failed to retreived working directory path: "
224  << strerror(errno);
225  }
226 
227  for(unsigned i = 0 ; i < filePatterns_.size(); ++i){
228  if(i>0) cmd << "|";
229  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
230  // cmd << curDir << "/";
231  // }
232  cmd << filePatterns_[i];
233  }
234  cmd << ")'";
235 
236  cout << "[WatcherSource " << now() << "]"
237  << " Command to retrieve input files: "
238  << cmd.str() << "\n";
239  cmdSet = true;
240  }
241 
242  struct stat buf;
243 
244  if(stat(tokenFile_.c_str(), &buf)!=0){
245  end_ = true;
246  }
247 
248  bool waiting = false;
249  static bool firstWait = true;
250  timeval waitStart;
251  //if no cached input file, look for new files until one is found:
252  if(!end_ && streamerInputFile_.get()==0){
253  fileName_.assign("");
254 
255  //check if we have file in the queue, if not look for new files:
256  while(filesInQueue_.size()==0){
257  if(stat(tokenFile_.c_str(), &buf)!=0){
258  end_ = true;
259  break;
260  }
261  FILE* s = popen(cmd.str().c_str(), "r");
262  if(s==0){
263  throw cms::Exception("WatcherSource")
264  << "Failed to retrieve list of input file: " << strerror(errno);
265  }
266 
267  ssize_t len;
268  while(!feof(s)){
269  if((len=getline(&lineptr, &n, s))>0){
270  //remove end-of-line character:
271  lineptr[len-1] = 0;
272  string fileName;
273  if(inputDir_.size()>0 && inputDir_[0] != '/'){//relative path
274  fileName.assign(curDir);
275  fileName.append("/");
276  fileName.append(inputDir_);
277  } else{
278  fileName.assign(inputDir_);
279  }
280  fileName.append("/");
281  fileName.append(lineptr);
282  filesInQueue_.push_back(fileName);
283  if(verbosity_) cout << "[WatcherSource " << now() << "]"
284  << " File to process: '"
285  << fileName << "'\n";
286  }
287  }
288  while(!feof(s)) fgetc(s);
289  pclose(s);
290  if(filesInQueue_.size()==0){
291  if(!waiting){
292  cout << "[WatcherSource " << now() << "]"
293  << " No file found. Waiting for new file...\n";
294  cout << flush;
295  waiting = true;
296  gettimeofday(&waitStart, 0);
297  } else if(!firstWait){
298  timeval t;
299  gettimeofday(&t, 0);
300  float dt = (t.tv_sec-waitStart.tv_sec) * 1.
301  + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
302  if((timeOut_ >= 0) && (dt > timeOut_)){
303  cout << "[WatcherSource " << now() << "]"
304  << " Having waited for new file for " << (int)dt << " sec. "
305  << "Timeout exceeded. Exits.\n";
306  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
307  end_ = true;
308  break;
309  }
310  }
311  }
312  sleep(1);
313  } //end of file queue update
314  firstWait = false;
315  free(lineptr); lineptr=0;
316 
317  while(streamerInputFile_.get()==0 && !filesInQueue_.empty()){
318 
319  fileName_ = filesInQueue_.front();
320  filesInQueue_.pop_front();
321  int fd = open(fileName_.c_str(), 0);
322  if(fd!=0){
323  struct stat buf;
324  off_t size = -1;
325  //check that file transfer is finished, by monitoring its size:
326  time_t t = time(0);
327  for(;;){
328  fstat(fd, &buf);
329  if(verbosity_) cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
330  if(buf.st_size==size) break; else size = buf.st_size;
331  if(difftime(t,buf.st_mtime)>60) break; //file older then 1 min=> tansfer must be finished
332  sleep(1);
333  }
334 
335  if(fd!=0 && buf.st_size == 0){//file is empty. streamer reader
336  // does not like empty file=> skip it
337  stringstream c;
338  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_
339  << "/.\"";
340  if(verbosity_) cout << "[WatcherSource " << now() << "]"
341  << " Excuting "
342  << c.str() << "\n";
343  int i = system(c.str().c_str());
344  if(i!=0){
345  //throw cms::Exception("WatcherSource")
346  cout << "[WatcherSource " << now() << "] "
347  << "Failed to move empty file '" << fileName_ << "'"
348  << " to corrupted directory '" << corruptedDir_ << "'\n";
349  }
350  continue;
351  }
352 
353  close(fd);
354 
355  vector<char> buf1(fileName_.size()+1);
356  copy(fileName_.begin(), fileName_.end(), buf1.begin());
357  buf1[buf1.size()-1] = 0;
358 
359  vector<char> buf2(fileName_.size()+1);
360  copy(fileName_.begin(), fileName_.end(), buf2.begin());
361  buf2[buf1.size()-1] = 0;
362 
363  string dirnam(dirname(&buf1[0]));
364  string filenam(basename(&buf2[0]));
365 
366  string dest = inprocessDir_ + "/" + filenam;
367 
368  if(verbosity_) cout << "[WatcherSource " << now() << "]"
369  << " Moving file "
370  << fileName_ << " to " << dest << "\n";
371 
372  if(0!=rename(fileName_.c_str(), dest.c_str())){
373  throw cms::Exception("WatcherSource")
374  << "Failed to move file '" << fileName_ << "' "
375  << "to processing directory " << inprocessDir_
376  << ": " << strerror(errno);
377  }
378 
379  fileName_ = dest;
380 
381  cout << "[WatcherSource " << now() << "]"
382  << " Opening file " << fileName_ << "\n" << flush;
384  = auto_ptr<edm::StreamerInputFile>(new edm::StreamerInputFile(fileName_));
385 
386  ofstream f(".watcherfile");
387  f << fileName_;
388  } else{
389  cout << "[WatcherSource " << now() << "]"
390  << " Failed to open file " << fileName_ << endl;
391  }
392  } //loop on file queue to find one file which opening succeeded
393  }
394  return streamerInputFile_.get();
395 }
std::vector< std::string > filePatterns_
float dt
Definition: AMPTWrapper.h:126
int i
Definition: DBlmapReader.cc:9
std::deque< std::string > filesInQueue_
void sleep(Duration_t)
Definition: Utils.h:163
std::auto_ptr< edm::StreamerInputFile > streamerInputFile_
string cmd
Definition: asciidump.py:19
double f[11][100]
static std::string now()
tuple cout
Definition: gather_cfg.py:121
tuple size
Write out results.
const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 195 of file WatcherStreamFileReader.cc.

References closeFile(), edm::StreamerInputFile::currentRecord(), end_, getInputFile(), analyzePatCOC_cfg::inputFile, and edm::StreamerInputFile::next().

195  {
196  if(end_){ closeFile(); return 0;}
197 
199 
200  //go to next input file, till no new event is found
201  while((inputFile=getInputFile())!=0
202  && inputFile->next()==0){
203  closeFile();
204  }
205 
206  return inputFile==0?0:inputFile->currentRecord();
207 }
edm::StreamerInputFile * getInputFile()
EventMsgView const * currentRecord() const
const bool WatcherStreamFileReader::newHeader ( )

Member Data Documentation

std::string WatcherStreamFileReader::corruptedDir_
private

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 59 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

bool WatcherStreamFileReader::end_
private

Definition at line 73 of file WatcherStreamFileReader.h.

Referenced by closeFile(), getInputFile(), and getNextEvent().

std::string WatcherStreamFileReader::fileName_
staticprivate

Definition at line 65 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

std::vector<std::string> WatcherStreamFileReader::filePatterns_
private

Streamer file name pattern list

Definition at line 46 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::deque<std::string> WatcherStreamFileReader::filesInQueue_
private

Definition at line 71 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::inprocessDir_
private

Directory where file are moved during processing

Definition at line 50 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::inputDir_
private

Directory to look for streamer files

Definition at line 42 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::processedDir_
private

Directory where file must be moved once processed

Definition at line 55 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and WatcherStreamFileReader().

std::auto_ptr<edm::StreamerInputFile> WatcherStreamFileReader::streamerInputFile_
private

Cached input file stream

Definition at line 63 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

int WatcherStreamFileReader::timeOut_
private

Definition at line 69 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::tokenFile_
private

Definition at line 67 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

int WatcherStreamFileReader::verbosity_
private

Definition at line 75 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().