Go to the documentation of this file.00001 #include "IOPool/Streamer/interface/MsgTools.h"
00002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00003 #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005
00006 #include <errno.h>
00007 #include <limits.h>
00008 #include <stdlib.h>
00009 #include <stdio.h>
00010 #include <cstring>
00011 #include <unistd.h>
00012 #include <sys/types.h>
00013 #include <sys/stat.h>
00014 #include <sys/time.h>
00015 #include <fcntl.h>
00016 #include <libgen.h>
00017 #include <fstream>
00018
00019
00020 using namespace std;
00021
00022 std::string WatcherStreamFileReader::fileName_;
00023
00024
00025 #ifdef __APPLE__
00026
00027
00028 #ifndef SIZE_MAX
00029 # define SIZE_MAX ((size_t) -1)
00030 #endif
00031 #ifndef SSIZE_MAX
00032 # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
00033 #endif
00034 namespace {
00035 ssize_t getline (char **lineptr, size_t *n, FILE *fp)
00036 {
00037 ssize_t result;
00038 size_t cur_len = 0;
00039
00040 if (lineptr == NULL || n == NULL || fp == NULL)
00041 {
00042 errno = EINVAL;
00043 return -1;
00044 }
00045
00046 if (*lineptr == NULL || *n == 0)
00047 {
00048 *n = 120;
00049 *lineptr = (char *) malloc (*n);
00050 if (*lineptr == NULL)
00051 {
00052 result = -1;
00053 goto end;
00054 }
00055 }
00056
00057 for (;;)
00058 {
00059 int i;
00060
00061 i = getc (fp);
00062 if (i == EOF)
00063 {
00064 result = -1;
00065 break;
00066 }
00067
00068
00069 if (cur_len + 1 >= *n)
00070 {
00071 size_t needed_max =
00072 SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
00073 size_t needed = 2 * *n + 1;
00074 char *new_lineptr;
00075
00076 if (needed_max < needed)
00077 needed = needed_max;
00078 if (cur_len + 1 >= needed)
00079 {
00080 result = -1;
00081 goto end;
00082 }
00083
00084 new_lineptr = (char *) realloc (*lineptr, needed);
00085 if (new_lineptr == NULL)
00086 {
00087 result = -1;
00088 goto end;
00089 }
00090
00091 *lineptr = new_lineptr;
00092 *n = needed;
00093 }
00094
00095 (*lineptr)[cur_len] = i;
00096 cur_len++;
00097
00098 if (i == '\n')
00099 break;
00100 }
00101 (*lineptr)[cur_len] = '\0';
00102 result = cur_len ? (ssize_t) cur_len : result;
00103
00104 end:
00105 return result;
00106 }
00107 }
00108 #endif
00109
00110 static std::string now(){
00111 struct timeval t;
00112 gettimeofday(&t, 0);
00113
00114 char buf[256];
00115 strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
00116 buf[sizeof(buf)-1] = 0;
00117
00118 stringstream buf2;
00119 buf2 << buf << " " << ((t.tv_usec+500)/1000) << " ms";
00120
00121 return buf2.str();
00122 }
00123
00124 WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset):
00125 inputDir_(pset.getParameter<std::string>("inputDir")),
00126 filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
00127 inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
00128 processedDir_(pset.getParameter<std::string>("processedDir")),
00129 corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
00130 tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile",
00131 "watcherSourceToken")),
00132 timeOut_(pset.getParameter<int>("timeOutInSec")),
00133 end_(false),
00134 verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)){
00135 struct stat buf;
00136 if(stat(tokenFile_.c_str(), &buf)){
00137 FILE* f = fopen(tokenFile_.c_str(), "w");
00138 if(f){
00139 fclose(f);
00140 } else{
00141 throw cms::Exception("WatcherSource") << "Failed to create token file.";
00142 }
00143 }
00144 vector<string> dirs;
00145 dirs.push_back(inprocessDir_);
00146 dirs.push_back(processedDir_);
00147 dirs.push_back(corruptedDir_);
00148
00149 for(unsigned i = 0; i < dirs.size(); ++i){
00150 const string& dir = dirs[i];
00151 struct stat fileStat;
00152 if(0==stat(dir.c_str(), &fileStat)){
00153 if(!S_ISDIR(fileStat.st_mode)){
00154 throw cms::Exception("[WatcherSource]")
00155 << "File " << dir << " exists but is not a directory "
00156 << " as expected.";
00157 }
00158 } else {
00159 if(0!=mkdir(dir.c_str(), 0755)){
00160 throw cms::Exception("[WatcherSource]")
00161 << "Failed to create directory " << dir
00162 << " for writing data.";
00163 }
00164 }
00165 }
00166 }
00167
00168 WatcherStreamFileReader::~WatcherStreamFileReader(){
00169 }
00170
00171 const bool WatcherStreamFileReader::newHeader() {
00172 edm::StreamerInputFile* inputFile = getInputFile();
00173 return inputFile?inputFile->newHeader():false;
00174 }
00175
00176 const InitMsgView* WatcherStreamFileReader::getHeader(){
00177
00178 edm::StreamerInputFile* inputFile = getInputFile();
00179
00180
00181 if(inputFile==0){
00182 throw cms::Exception("WatcherSource") << "No input file found.";
00183 }
00184
00185 const InitMsgView* header = inputFile->startMessage();
00186
00187 if(header->code() != Header::INIT)
00188 throw cms::Exception("readHeader","WatcherStreamFileReader")
00189 << "received wrong message type: expected INIT, got "
00190 << header->code() << "\n";
00191
00192 return header;
00193 }
00194
00195 const EventMsgView* WatcherStreamFileReader::getNextEvent(){
00196 if(end_){ closeFile(); return 0;}
00197
00198 edm::StreamerInputFile* inputFile;
00199
00200
00201 while((inputFile=getInputFile())!=0
00202 && inputFile->next()==0){
00203 closeFile();
00204 }
00205
00206 return inputFile==0?0:inputFile->currentRecord();
00207 }
00208
00209 edm::StreamerInputFile* WatcherStreamFileReader::getInputFile(){
00210 char* lineptr = 0;
00211 size_t n = 0;
00212 static stringstream cmd;
00213 static bool cmdSet = false;
00214 static char curDir[PATH_MAX>0?PATH_MAX:4096];
00215
00216 if(!cmdSet){
00217 cmd.str("");
00218 cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
00219
00220 if(filePatterns_.size()==0) return 0;
00221 if(getcwd(curDir, sizeof(curDir))==0){
00222 throw cms::Exception("WatcherSource")
00223 << "Failed to retreived working directory path: "
00224 << strerror(errno);
00225 }
00226
00227 for(unsigned i = 0 ; i < filePatterns_.size(); ++i){
00228 if(i>0) cmd << "|";
00229
00230
00231
00232 cmd << filePatterns_[i];
00233 }
00234 cmd << ")'";
00235
00236 cout << "[WatcherSource " << now() << "]"
00237 << " Command to retrieve input files: "
00238 << cmd.str() << "\n";
00239 cmdSet = true;
00240 }
00241
00242 struct stat buf;
00243
00244 if(stat(tokenFile_.c_str(), &buf)!=0){
00245 end_ = true;
00246 }
00247
00248 bool waiting = false;
00249 static bool firstWait = true;
00250 timeval waitStart;
00251
00252 if(!end_ && streamerInputFile_.get()==0){
00253 fileName_.assign("");
00254
00255
00256 while(filesInQueue_.size()==0){
00257 if(stat(tokenFile_.c_str(), &buf)!=0){
00258 end_ = true;
00259 break;
00260 }
00261 FILE* s = popen(cmd.str().c_str(), "r");
00262 if(s==0){
00263 throw cms::Exception("WatcherSource")
00264 << "Failed to retrieve list of input file: " << strerror(errno);
00265 }
00266
00267 ssize_t len;
00268 while(!feof(s)){
00269 if((len=getline(&lineptr, &n, s))>0){
00270
00271 lineptr[len-1] = 0;
00272 string fileName;
00273 if(inputDir_.size()>0 && inputDir_[0] != '/'){
00274 fileName.assign(curDir);
00275 fileName.append("/");
00276 fileName.append(inputDir_);
00277 } else{
00278 fileName.assign(inputDir_);
00279 }
00280 fileName.append("/");
00281 fileName.append(lineptr);
00282 filesInQueue_.push_back(fileName);
00283 if(verbosity_) cout << "[WatcherSource " << now() << "]"
00284 << " File to process: '"
00285 << fileName << "'\n";
00286 }
00287 }
00288 while(!feof(s)) fgetc(s);
00289 pclose(s);
00290 if(filesInQueue_.size()==0){
00291 if(!waiting){
00292 cout << "[WatcherSource " << now() << "]"
00293 << " No file found. Waiting for new file...\n";
00294 cout << flush;
00295 waiting = true;
00296 gettimeofday(&waitStart, 0);
00297 } else if(!firstWait){
00298 timeval t;
00299 gettimeofday(&t, 0);
00300 float dt = (t.tv_sec-waitStart.tv_sec) * 1.
00301 + (t.tv_usec-waitStart.tv_usec) * 1.e-6;
00302 if((timeOut_ >= 0) && (dt > timeOut_)){
00303 cout << "[WatcherSource " << now() << "]"
00304 << " Having waited for new file for " << (int)dt << " sec. "
00305 << "Timeout exceeded. Exits.\n";
00306
00307 end_ = true;
00308 break;
00309 }
00310 }
00311 }
00312 sleep(1);
00313 }
00314 firstWait = false;
00315 free(lineptr); lineptr=0;
00316
00317 while(streamerInputFile_.get()==0 && !filesInQueue_.empty()){
00318
00319 fileName_ = filesInQueue_.front();
00320 filesInQueue_.pop_front();
00321 int fd = open(fileName_.c_str(), 0);
00322 if(fd!=0){
00323 struct stat buf;
00324 off_t size = -1;
00325
00326 time_t t = time(0);
00327 for(;;){
00328 fstat(fd, &buf);
00329 if(verbosity_) cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
00330 if(buf.st_size==size) break; else size = buf.st_size;
00331 if(difftime(t,buf.st_mtime)>60) break;
00332 sleep(1);
00333 }
00334
00335 if(fd!=0 && buf.st_size == 0){
00336
00337 stringstream c;
00338 c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_
00339 << "/.\"";
00340 if(verbosity_) cout << "[WatcherSource " << now() << "]"
00341 << " Excuting "
00342 << c.str() << "\n";
00343 int i = system(c.str().c_str());
00344 if(i!=0){
00345
00346 cout << "[WatcherSource " << now() << "] "
00347 << "Failed to move empty file '" << fileName_ << "'"
00348 << " to corrupted directory '" << corruptedDir_ << "'\n";
00349 }
00350 continue;
00351 }
00352
00353 close(fd);
00354
00355 vector<char> buf1(fileName_.size()+1);
00356 copy(fileName_.begin(), fileName_.end(), buf1.begin());
00357 buf1[buf1.size()-1] = 0;
00358
00359 vector<char> buf2(fileName_.size()+1);
00360 copy(fileName_.begin(), fileName_.end(), buf2.begin());
00361 buf2[buf1.size()-1] = 0;
00362
00363 string dirnam(dirname(&buf1[0]));
00364 string filenam(basename(&buf2[0]));
00365
00366 string dest = inprocessDir_ + "/" + filenam;
00367
00368 if(verbosity_) cout << "[WatcherSource " << now() << "]"
00369 << " Moving file "
00370 << fileName_ << " to " << dest << "\n";
00371
00372 if(0!=rename(fileName_.c_str(), dest.c_str())){
00373 throw cms::Exception("WatcherSource")
00374 << "Failed to move file '" << fileName_ << "' "
00375 << "to processing directory " << inprocessDir_
00376 << ": " << strerror(errno);
00377 }
00378
00379 fileName_ = dest;
00380
00381 cout << "[WatcherSource " << now() << "]"
00382 << " Opening file " << fileName_ << "\n" << flush;
00383 streamerInputFile_
00384 = auto_ptr<edm::StreamerInputFile>(new edm::StreamerInputFile(fileName_));
00385
00386 ofstream f(".watcherfile");
00387 f << fileName_;
00388 } else{
00389 cout << "[WatcherSource " << now() << "]"
00390 << " Failed to open file " << fileName_ << endl;
00391 }
00392 }
00393 }
00394 return streamerInputFile_.get();
00395 }
00396
00397 void WatcherStreamFileReader::closeFile(){
00398 if(streamerInputFile_.get()==0) return;
00399
00400 streamerInputFile_.reset();
00401 stringstream cmd;
00402
00403 cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
00404 if(verbosity_) cout << "[WatcherSource " << now() << "]"
00405 << " Excuting " << cmd.str() << "\n";
00406 int i = system(cmd.str().c_str());
00407 if(i!=0){
00408 throw cms::Exception("WatcherSource")
00409 << "Failed to move processed file '" << fileName_ << "'"
00410 << " to processed directory '" << processedDir_ << "'\n";
00411
00412 end_ = true;
00413 }
00414 cout << flush;
00415 }