CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/MsgTools.h"
00002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00003 #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 
00006 #include <errno.h>
00007 #include <limits.h>
00008 #include <stdlib.h>
00009 #include <stdio.h>
00010 #include <cstring>
00011 #include <unistd.h>
00012 #include <sys/types.h>
00013 #include <sys/stat.h>
00014 #include <sys/time.h>
00015 #include <fcntl.h>
00016 #include <libgen.h>
00017 #include <fstream>
00018 
00019 //using namespace edm;
00020 using namespace std;
00021 
00022 std::string WatcherStreamFileReader::fileName_;
00023 
00024 
00025 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
00026 /* getline implementation is copied from glibc. */
00027 
00028 #ifndef SIZE_MAX
00029 # define SIZE_MAX ((size_t) -1)
00030 #endif
00031 #ifndef SSIZE_MAX
00032 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
00033 #endif
00034 namespace {
00035 ssize_t getline (char **lineptr, size_t *n, FILE *fp)
00036 {
00037     ssize_t result = -1;
00038     size_t cur_len = 0;
00039 
00040     if (lineptr == NULL || n == NULL || fp == NULL)
00041     {
00042         errno = EINVAL;
00043         return -1;
00044    }
00045 
00046     if (*lineptr == NULL || *n == 0)
00047     {
00048         *n = 120;
00049         *lineptr = (char *) malloc (*n);
00050         if (*lineptr == NULL)
00051         {
00052             result = -1;
00053             goto end;
00054         }
00055     }
00056 
00057     for (;;)
00058     {
00059         int i;
00060 
00061         i = getc (fp);
00062         if (i == EOF)
00063         {
00064             result = -1;
00065             break;
00066         }
00067 
00068         /* Make enough space for len+1 (for final NUL) bytes.  */
00069         if (cur_len + 1 >= *n)
00070         {
00071             size_t needed_max =
00072                 SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
00073             size_t needed = 2 * *n + 1;   /* Be generous. */
00074             char *new_lineptr;
00075 
00076             if (needed_max < needed)
00077                 needed = needed_max;
00078             if (cur_len + 1 >= needed)
00079             {
00080                 result = -1;
00081                 goto end;
00082             }
00083 
00084             new_lineptr = (char *) realloc (*lineptr, needed);
00085             if (new_lineptr == NULL)
00086             {
00087                 result = -1;
00088                 goto end;
00089             }
00090 
00091             *lineptr = new_lineptr;
00092             *n = needed;
00093         }
00094 
00095         (*lineptr)[cur_len] = i;
00096         cur_len++;
00097 
00098         if (i == '\n')
00099             break;
00100     }
00101     (*lineptr)[cur_len] = '\0';
00102     result = cur_len ? (ssize_t) cur_len : result;
00103 
00104 end:
00105     return result;
00106 }
00107 }
00108 #endif
00109 
00110 static std::string now(){
00111   struct timeval t;
00112   gettimeofday(&t, 0);
00113  
00114   char buf[256];
00115   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
00116   buf[sizeof(buf)-1] = 0;
00117 
00118   stringstream buf2;
00119   buf2 << buf << " " << ((t.tv_usec+500)/1000)  << " ms";
00120 
00121   return buf2.str();
00122 }
00123 
00124 WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset):
00125   inputDir_(pset.getParameter<std::string>("inputDir")),
00126   filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
00127   inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
00128   processedDir_(pset.getParameter<std::string>("processedDir")),
00129   corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
00130   tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile",
00131                                                      "watcherSourceToken")),
00132   timeOut_(pset.getParameter<int>("timeOutInSec")),
00133   end_(false),
00134   verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)){
00135   struct stat buf;
00136   if(stat(tokenFile_.c_str(), &buf)){
00137     FILE* f = fopen(tokenFile_.c_str(), "w");
00138     if(f){
00139       fclose(f);
00140     } else{
00141       throw cms::Exception("WatcherSource") << "Failed to create token file.";
00142     }
00143   }
00144   vector<string> dirs;
00145   dirs.push_back(inprocessDir_);
00146   dirs.push_back(processedDir_);
00147   dirs.push_back(corruptedDir_);
00148   
00149   for(unsigned i = 0; i < dirs.size(); ++i){
00150     const string& dir = dirs[i];
00151     struct stat fileStat;
00152     if(0==stat(dir.c_str(), &fileStat)){
00153       if(!S_ISDIR(fileStat.st_mode)){
00154         throw cms::Exception("[WatcherSource]")
00155           << "File " << dir << " exists but is not a directory "
00156           << " as expected.";
00157       }
00158     } else {//directory does not exists, let's try to create it
00159       if(0!=mkdir(dir.c_str(), 0755)){
00160         throw cms::Exception("[WatcherSource]")
00161           << "Failed to create directory " << dir
00162           << " for writing data.";
00163       }
00164     }
00165   }
00166 }
00167 
00168 WatcherStreamFileReader::~WatcherStreamFileReader(){
00169 }
00170 
00171 const bool WatcherStreamFileReader::newHeader() {
00172   edm::StreamerInputFile* inputFile = getInputFile();
00173   return inputFile?inputFile->newHeader():false;
00174 }
00175 
00176 const InitMsgView* WatcherStreamFileReader::getHeader(){
00177 
00178   edm::StreamerInputFile* inputFile = getInputFile();
00179 
00180   //TODO: shall better send an exception...
00181   if(inputFile==0){
00182     throw cms::Exception("WatcherSource") << "No input file found.";
00183   }
00184   
00185   const InitMsgView* header = inputFile->startMessage();
00186   
00187   if(header->code() != Header::INIT) //INIT Msg
00188     throw cms::Exception("readHeader","WatcherStreamFileReader")
00189       << "received wrong message type: expected INIT, got "
00190       << header->code() << "\n";
00191     
00192   return header;
00193 }
00194   
00195 const EventMsgView* WatcherStreamFileReader::getNextEvent(){
00196   if(end_){ closeFile(); return 0;}
00197   
00198   edm::StreamerInputFile* inputFile;
00199 
00200   //go to next input file, till no new event is found
00201   while((inputFile=getInputFile())!=0
00202         && inputFile->next()==0){
00203     closeFile();
00204   }
00205 
00206   return inputFile==0?0:inputFile->currentRecord();
00207 }
00208 
00209 edm::StreamerInputFile* WatcherStreamFileReader::getInputFile(){
00210   char* lineptr = 0;
00211   size_t n = 0;
00212   static stringstream cmd;
00213   static bool cmdSet = false;
00214   static char curDir[PATH_MAX>0?PATH_MAX:4096];
00215 
00216   if(!cmdSet){
00217     cmd.str("");
00218     cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
00219     //TODO: validate patternDir (see ;, &&, ||) and escape special character
00220     if(filePatterns_.size()==0) return 0;
00221     if(getcwd(curDir, sizeof(curDir))==0){
00222       throw cms::Exception("WatcherSource")
00223         << "Failed to retreived working directory path: "
00224         << strerror(errno);
00225     }
00226     
00227     for(unsigned i = 0 ; i < filePatterns_.size(); ++i){
00228       if(i>0) cmd << "|";
00229       //     if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
00230       //       cmd << curDir << "/";
00231       //     }
00232       cmd << filePatterns_[i];
00233     }
00234     cmd << ")'";
00235     
00236     cout << "[WatcherSource " << now() << "]" 
00237          << " Command to retrieve input files: "
00238          << cmd.str() << "\n";
00239     cmdSet = true;
00240   }
00241 
00242   struct stat buf;
00243   
00244   if(stat(tokenFile_.c_str(), &buf)!=0){ 
00245     end_ = true; 
00246   }
00247   
00248   bool waiting = false;
00249   static bool firstWait = true;
00250   timeval waitStart;
00251   //if no cached input file, look for new files until one is found:
00252   if(!end_ && streamerInputFile_.get()==0){
00253     fileName_.assign("");
00254     
00255     //check if we have file in the queue, if not look for new files:
00256     while(filesInQueue_.size()==0){
00257       if(stat(tokenFile_.c_str(), &buf)!=0){ 
00258         end_ = true; 
00259         break;
00260       }
00261       FILE* s = popen(cmd.str().c_str(), "r");
00262       if(s==0){
00263         throw cms::Exception("WatcherSource")
00264           << "Failed to retrieve list of input file: " << strerror(errno);
00265       }
00266       
00267       ssize_t len;
00268       while(!feof(s)){
00269         if((len=getline(&lineptr, &n, s))>0){
00270           //remove end-of-line character:
00271           lineptr[len-1] = 0;
00272           string fileName;
00273           if(inputDir_.size()>0 && inputDir_[0] != '/'){//relative path
00274             fileName.assign(curDir);
00275             fileName.append("/");
00276             fileName.append(inputDir_);
00277           } else{
00278             fileName.assign(inputDir_);
00279           }
00280           fileName.append("/");
00281           fileName.append(lineptr);
00282           filesInQueue_.push_back(fileName);
00283           if(verbosity_) cout << "[WatcherSource " << now() << "]" 
00284                               << " File to process: '"
00285                               << fileName << "'\n";
00286         }
00287       }
00288       while(!feof(s)) fgetc(s);
00289       pclose(s);
00290       if(filesInQueue_.size()==0){
00291         if(!waiting){
00292           cout << "[WatcherSource " << now() << "]" 
00293                << " No file found. Waiting for new file...\n";
00294           cout << flush;
00295           waiting = true;
00296           gettimeofday(&waitStart, 0);
00297         } else if(!firstWait){
00298           timeval t;
00299           gettimeofday(&t, 0);
00300           float dt = (t.tv_sec-waitStart.tv_sec) * 1.
00301             + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
00302           if((timeOut_ >= 0) && (dt > timeOut_)){
00303             cout << "[WatcherSource " << now() << "]"
00304                  << " Having waited for new file for " << (int)dt << " sec. "
00305                  << "Timeout exceeded. Exits.\n";
00306             //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
00307             end_ = true;
00308             break;
00309           }
00310         }
00311       }
00312       sleep(1);
00313     } //end of file queue update
00314     firstWait = false;
00315     free(lineptr); lineptr=0;
00316     
00317     while(streamerInputFile_.get()==0 && !filesInQueue_.empty()){
00318 
00319       fileName_ = filesInQueue_.front();
00320       filesInQueue_.pop_front();
00321       int fd = open(fileName_.c_str(), 0);
00322       if(fd!=0){
00323         struct stat buf;
00324         off_t size = -1;
00325         //check that file transfer is finished, by monitoring its size:
00326         time_t t = time(0);
00327         for(;;){
00328           fstat(fd, &buf);
00329           if(verbosity_) cout << "file size: " << buf.st_size << ", prev size: "  << size << "\n";
00330           if(buf.st_size==size) break; else size = buf.st_size;
00331           if(difftime(t,buf.st_mtime)>60) break; //file older then 1 min=> tansfer must be finished
00332           sleep(1);
00333         }
00334 
00335         if(fd!=0 && buf.st_size == 0){//file is empty. streamer reader 
00336           //                   does not like empty file=> skip it
00337           stringstream c;
00338           c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_
00339             << "/.\"";
00340           if(verbosity_) cout << "[WatcherSource " << now() << "]" 
00341                               << " Excuting "
00342                               << c.str() << "\n"; 
00343           int i = system(c.str().c_str());
00344           if(i!=0){
00345             //throw cms::Exception("WatcherSource")
00346             cout << "[WatcherSource " << now() << "] "
00347                  << "Failed to move empty file '" << fileName_ << "'"
00348                  << " to corrupted directory '" << corruptedDir_ << "'\n";
00349           }
00350           continue;
00351         }
00352         
00353         close(fd);
00354 
00355         vector<char> buf1(fileName_.size()+1);
00356         copy(fileName_.begin(), fileName_.end(), buf1.begin());
00357         buf1[buf1.size()-1] = 0;
00358         
00359         vector<char> buf2(fileName_.size()+1);
00360         copy(fileName_.begin(), fileName_.end(), buf2.begin());
00361         buf2[buf1.size()-1] = 0;
00362         
00363         string dirnam(dirname(&buf1[0]));
00364         string filenam(basename(&buf2[0]));
00365         
00366         string dest  = inprocessDir_ + "/" + filenam;
00367         
00368         if(verbosity_) cout << "[WatcherSource " << now() << "]" 
00369                             << " Moving file "
00370                             << fileName_ << " to " << dest << "\n";
00371         
00372         if(0!=rename(fileName_.c_str(), dest.c_str())){
00373           throw cms::Exception("WatcherSource")
00374             << "Failed to move file '" << fileName_ << "' "
00375             << "to processing directory " << inprocessDir_
00376             << ": " << strerror(errno);
00377         }
00378         
00379         fileName_ = dest;
00380 
00381         cout << "[WatcherSource " << now() << "]" 
00382              << " Opening file " << fileName_ << "\n" << flush;
00383         streamerInputFile_
00384           = auto_ptr<edm::StreamerInputFile>(new edm::StreamerInputFile(fileName_));
00385 
00386         ofstream f(".watcherfile");
00387         f << fileName_; 
00388       } else{
00389         cout << "[WatcherSource " << now() << "]" 
00390              << " Failed to open file " << fileName_ << endl;
00391       }
00392     } //loop on file queue to find one file which opening succeeded
00393   }
00394   return streamerInputFile_.get();
00395 }
00396 
00397 void WatcherStreamFileReader::closeFile(){
00398   if(streamerInputFile_.get()==0) return;
00399   //delete the streamer input file:
00400   streamerInputFile_.reset();
00401   stringstream cmd;
00402   //TODO: validation of processDir
00403   cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
00404   if(verbosity_) cout << "[WatcherSource " << now() << "]" 
00405                       << " Excuting " << cmd.str() << "\n"; 
00406   int i = system(cmd.str().c_str());
00407   if(i!=0){
00408     throw cms::Exception("WatcherSource")
00409       << "Failed to move processed file '" << fileName_ << "'"
00410       << " to processed directory '" << processedDir_ << "'\n";
00411     //Stop further processing to prevent endless loop:
00412     end_ = true;
00413   }
00414   cout << flush;
00415 }