CMS 3D CMS Logo

Public Member Functions | Private Attributes | Static Private Attributes

WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

List of all members.

Public Member Functions

void closeFile ()
const InitMsgViewgetHeader ()
edm::StreamerInputFilegetInputFile ()
const EventMsgViewgetNextEvent ()
const bool newHeader ()
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 ~WatcherStreamFileReader ()

Private Attributes

std::string corruptedDir_
bool end_
std::vector< std::string > filePatterns_
std::deque< std::string > filesInQueue_
std::string inprocessDir_
std::string inputDir_
std::string processedDir_
std::auto_ptr
< edm::StreamerInputFile
streamerInputFile_
int timeOut_
std::string tokenFile_
int verbosity_

Static Private Attributes

static std::string fileName_

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.


Constructor & Destructor Documentation

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 124 of file WatcherStreamFileReader.cc.

References corruptedDir_, dir, Exception, f, i, inprocessDir_, processedDir_, and tokenFile_.

                                                                           :
  inputDir_(pset.getParameter<std::string>("inputDir")),
  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
  processedDir_(pset.getParameter<std::string>("processedDir")),
  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile",
                                                     "watcherSourceToken")),
  timeOut_(pset.getParameter<int>("timeOutInSec")),
  end_(false),
  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)){
  struct stat buf;
  if(stat(tokenFile_.c_str(), &buf)){
    FILE* f = fopen(tokenFile_.c_str(), "w");
    if(f){
      fclose(f);
    } else{
      throw cms::Exception("WatcherSource") << "Failed to create token file.";
    }
  }
  vector<string> dirs;
  dirs.push_back(inprocessDir_);
  dirs.push_back(processedDir_);
  dirs.push_back(corruptedDir_);
  
  for(unsigned i = 0; i < dirs.size(); ++i){
    const string& dir = dirs[i];
    struct stat fileStat;
    if(0==stat(dir.c_str(), &fileStat)){
      if(!S_ISDIR(fileStat.st_mode)){
        throw cms::Exception("[WatcherSource]")
          << "File " << dir << " exists but is not a directory "
          << " as expected.";
      }
    } else {//directory does not exists, let's try to create it
      if(0!=mkdir(dir.c_str(), 0755)){
        throw cms::Exception("[WatcherSource]")
          << "Failed to create directory " << dir
          << " for writing data.";
      }
    }
  }
}
WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 168 of file WatcherStreamFileReader.cc.

                                                 {
}

Member Function Documentation

void WatcherStreamFileReader::closeFile ( )

Definition at line 397 of file WatcherStreamFileReader.cc.

References asciidump::cmd, gather_cfg::cout, end_, Exception, fileName_, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent().

                                       {
  if(streamerInputFile_.get()==0) return;
  //delete the streamer input file:
  streamerInputFile_.reset();
  stringstream cmd;
  //TODO: validation of processDir
  cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
  if(verbosity_) cout << "[WatcherSource " << now() << "]" 
                      << " Excuting " << cmd.str() << "\n"; 
  int i = system(cmd.str().c_str());
  if(i!=0){
    throw cms::Exception("WatcherSource")
      << "Failed to move processed file '" << fileName_ << "'"
      << " to processed directory '" << processedDir_ << "'\n";
    //Stop further processing to prevent endless loop:
    end_ = true;
  }
  cout << flush;
}
const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 176 of file WatcherStreamFileReader.cc.

References InitMsgView::code(), Exception, getInputFile(), Header::INIT, analyzePatCleaning_cfg::inputFile, and edm::StreamerInputFile::startMessage().

                                                     {

  edm::StreamerInputFile* inputFile = getInputFile();

  //TODO: shall better send an exception...
  if(inputFile==0){
    throw cms::Exception("WatcherSource") << "No input file found.";
  }
  
  const InitMsgView* header = inputFile->startMessage();
  
  if(header->code() != Header::INIT) //INIT Msg
    throw cms::Exception("readHeader","WatcherStreamFileReader")
      << "received wrong message type: expected INIT, got "
      << header->code() << "\n";
    
  return header;
}
edm::StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 209 of file WatcherStreamFileReader.cc.

References trackerHits::c, asciidump::cmd, filterCSVwithJSON::copy, corruptedDir_, gather_cfg::cout, alignmentValidation::dest, dt, end_, Exception, f, convertXMLtoSQLite_cfg::fileName, fileName_, filePatterns_, filesInQueue_, i, inprocessDir_, inputDir_, now(), asciidump::s, findQualityFiles::size, stor::utils::sleep(), streamerInputFile_, matplotRender::t, cond::rpcobgas::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

                                                           {
  char* lineptr = 0;
  size_t n = 0;
  static stringstream cmd;
  static bool cmdSet = false;
  static char curDir[PATH_MAX>0?PATH_MAX:4096];

  if(!cmdSet){
    cmd.str("");
    cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
    //TODO: validate patternDir (see ;, &&, ||) and escape special character
    if(filePatterns_.size()==0) return 0;
    if(getcwd(curDir, sizeof(curDir))==0){
      throw cms::Exception("WatcherSource")
        << "Failed to retreived working directory path: "
        << strerror(errno);
    }
    
    for(unsigned i = 0 ; i < filePatterns_.size(); ++i){
      if(i>0) cmd << "|";
      //     if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
      //       cmd << curDir << "/";
      //     }
      cmd << filePatterns_[i];
    }
    cmd << ")'";
    
    cout << "[WatcherSource " << now() << "]" 
         << " Command to retrieve input files: "
         << cmd.str() << "\n";
    cmdSet = true;
  }

  struct stat buf;
  
  if(stat(tokenFile_.c_str(), &buf)!=0){ 
    end_ = true; 
  }
  
  bool waiting = false;
  static bool firstWait = true;
  timeval waitStart;
  //if no cached input file, look for new files until one is found:
  if(!end_ && streamerInputFile_.get()==0){
    fileName_.assign("");
    
    //check if we have file in the queue, if not look for new files:
    while(filesInQueue_.size()==0){
      if(stat(tokenFile_.c_str(), &buf)!=0){ 
        end_ = true; 
        break;
      }
      FILE* s = popen(cmd.str().c_str(), "r");
      if(s==0){
        throw cms::Exception("WatcherSource")
          << "Failed to retrieve list of input file: " << strerror(errno);
      }
      
      ssize_t len;
      while(!feof(s)){
        if((len=getline(&lineptr, &n, s))>0){
          //remove end-of-line character:
          lineptr[len-1] = 0;
          string fileName;
          if(inputDir_.size()>0 && inputDir_[0] != '/'){//relative path
            fileName.assign(curDir);
            fileName.append("/");
            fileName.append(inputDir_);
          } else{
            fileName.assign(inputDir_);
          }
          fileName.append("/");
          fileName.append(lineptr);
          filesInQueue_.push_back(fileName);
          if(verbosity_) cout << "[WatcherSource " << now() << "]" 
                              << " File to process: '"
                              << fileName << "'\n";
        }
      }
      while(!feof(s)) fgetc(s);
      pclose(s);
      if(filesInQueue_.size()==0){
        if(!waiting){
          cout << "[WatcherSource " << now() << "]" 
               << " No file found. Waiting for new file...\n";
          cout << flush;
          waiting = true;
          gettimeofday(&waitStart, 0);
        } else if(!firstWait){
          timeval t;
          gettimeofday(&t, 0);
          float dt = (t.tv_sec-waitStart.tv_sec) * 1.
            + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
          if((timeOut_ >= 0) && (dt > timeOut_)){
            cout << "[WatcherSource " << now() << "]"
                 << " Having waited for new file for " << (int)dt << " sec. "
                 << "Timeout exceeded. Exits.\n";
            //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
            end_ = true;
            break;
          }
        }
      }
      sleep(1);
    } //end of file queue update
    firstWait = false;
    free(lineptr); lineptr=0;
    
    while(streamerInputFile_.get()==0 && !filesInQueue_.empty()){

      fileName_ = filesInQueue_.front();
      filesInQueue_.pop_front();
      int fd = open(fileName_.c_str(), 0);
      if(fd!=0){
        struct stat buf;
        off_t size = -1;
        //check that file transfer is finished, by monitoring its size:
        time_t t = time(0);
        for(;;){
          fstat(fd, &buf);
          if(verbosity_) cout << "file size: " << buf.st_size << ", prev size: "  << size << "\n";
          if(buf.st_size==size) break; else size = buf.st_size;
          if(difftime(t,buf.st_mtime)>60) break; //file older then 1 min=> tansfer must be finished
          sleep(1);
        }

        if(fd!=0 && buf.st_size == 0){//file is empty. streamer reader 
          //                   does not like empty file=> skip it
          stringstream c;
          c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_
            << "/.\"";
          if(verbosity_) cout << "[WatcherSource " << now() << "]" 
                              << " Excuting "
                              << c.str() << "\n"; 
          int i = system(c.str().c_str());
          if(i!=0){
            //throw cms::Exception("WatcherSource")
            cout << "[WatcherSource " << now() << "] "
                 << "Failed to move empty file '" << fileName_ << "'"
                 << " to corrupted directory '" << corruptedDir_ << "'\n";
          }
          continue;
        }
        
        close(fd);

        vector<char> buf1(fileName_.size()+1);
        copy(fileName_.begin(), fileName_.end(), buf1.begin());
        buf1[buf1.size()-1] = 0;
        
        vector<char> buf2(fileName_.size()+1);
        copy(fileName_.begin(), fileName_.end(), buf2.begin());
        buf2[buf1.size()-1] = 0;
        
        string dirnam(dirname(&buf1[0]));
        string filenam(basename(&buf2[0]));
        
        string dest  = inprocessDir_ + "/" + filenam;
        
        if(verbosity_) cout << "[WatcherSource " << now() << "]" 
                            << " Moving file "
                            << fileName_ << " to " << dest << "\n";
        
        if(0!=rename(fileName_.c_str(), dest.c_str())){
          throw cms::Exception("WatcherSource")
            << "Failed to move file '" << fileName_ << "' "
            << "to processing directory " << inprocessDir_
            << ": " << strerror(errno);
        }
        
        fileName_ = dest;

        cout << "[WatcherSource " << now() << "]" 
             << " Opening file " << fileName_ << "\n" << flush;
        streamerInputFile_
          = auto_ptr<edm::StreamerInputFile>(new edm::StreamerInputFile(fileName_));

        ofstream f(".watcherfile");
        f << fileName_; 
      } else{
        cout << "[WatcherSource " << now() << "]" 
             << " Failed to open file " << fileName_ << endl;
      }
    } //loop on file queue to find one file which opening succeeded
  }
  return streamerInputFile_.get();
}
const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 195 of file WatcherStreamFileReader.cc.

References closeFile(), edm::StreamerInputFile::currentRecord(), end_, getInputFile(), analyzePatCleaning_cfg::inputFile, and edm::StreamerInputFile::next().

                                                         {
  if(end_){ closeFile(); return 0;}
  
  edm::StreamerInputFile* inputFile;

  //go to next input file, till no new event is found
  while((inputFile=getInputFile())!=0
        && inputFile->next()==0){
    closeFile();
  }

  return inputFile==0?0:inputFile->currentRecord();
}
const bool WatcherStreamFileReader::newHeader ( )

Member Data Documentation

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 59 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

Definition at line 73 of file WatcherStreamFileReader.h.

Referenced by closeFile(), getInputFile(), and getNextEvent().

std::string WatcherStreamFileReader::fileName_ [static, private]

Definition at line 65 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

std::vector<std::string> WatcherStreamFileReader::filePatterns_ [private]

Streamer file name pattern list

Definition at line 46 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::deque<std::string> WatcherStreamFileReader::filesInQueue_ [private]

Definition at line 71 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

Directory where file are moved during processing

Definition at line 50 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

std::string WatcherStreamFileReader::inputDir_ [private]

Directory to look for streamer files

Definition at line 42 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

Directory where file must be moved once processed

Definition at line 55 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and WatcherStreamFileReader().

Cached input file stream

Definition at line 63 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

Definition at line 69 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

std::string WatcherStreamFileReader::tokenFile_ [private]

Definition at line 67 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

Definition at line 75 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().