CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/IOPool/Streamer/src/StreamerInputFile.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002 
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 #include "FWCore/Sources/interface/EventSkipperByID.h"
00005 #include "FWCore/Utilities/interface/DebugMacros.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 #include "FWCore/Utilities/interface/Exception.h"
00008 #include "FWCore/Utilities/interface/TimeOfDay.h"
00009 
00010 #include "Utilities/StorageFactory/interface/IOFlags.h"
00011 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00012 
00013 #include <iomanip>
00014 #include <iostream>
00015 
00016 namespace edm {
00017 
00018   StreamerInputFile::~StreamerInputFile() {
00019     if(storage_) {
00020       storage_->close();
00021       if(currentFileOpen_) logFileAction("  Closed file ");
00022     }
00023   }
00024 
00025   StreamerInputFile::StreamerInputFile(std::string const& name,
00026                                        int* numberOfEventsToSkip,
00027                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00028     startMsg_(),
00029     currentEvMsg_(),
00030     headerBuf_(1000*1000),
00031     eventBuf_(1000*1000*7),
00032     multiStreams_(false),
00033     currentFileName_(),
00034     currentFileOpen_(false),
00035     eventSkipperByID_(eventSkipperByID),
00036     numberOfEventsToSkip_(numberOfEventsToSkip),
00037     newHeader_(false),
00038     endOfFile_(false) {
00039     openStreamerFile(name);
00040     readStartMessage();
00041   }
00042 
00043 
00044   StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00045                                        int* numberOfEventsToSkip,
00046                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00047     startMsg_(),
00048     currentEvMsg_(),
00049     headerBuf_(1000*1000),
00050     eventBuf_(1000*1000*7),
00051     currentFile_(0),
00052     streamerNames_(names),
00053     multiStreams_(true),
00054     currentFileName_(),
00055     currentFileOpen_(false),
00056     eventSkipperByID_(eventSkipperByID),
00057     numberOfEventsToSkip_(numberOfEventsToSkip),
00058     currRun_(0),
00059     currProto_(0),
00060     newHeader_(false),
00061     endOfFile_(false) {
00062     openStreamerFile(names.at(0));
00063     ++currentFile_;
00064     readStartMessage();
00065     currRun_ = startMsg_->run();
00066     currProto_ = startMsg_->protocolVersion();
00067   }
00068 
00069   void
00070   StreamerInputFile::openStreamerFile(std::string const& name) {
00071 
00072     if(storage_) {
00073       storage_->close();
00074       if(currentFileOpen_) logFileAction("  Closed file ");
00075     }
00076 
00077     currentFileName_ = name;
00078     currentFileOpen_ = false;
00079     logFileAction("  Initiating request to open file ");
00080 
00081     IOOffset size = -1;
00082     if(StorageFactory::get()->check(name.c_str(), &size)) {
00083       try {
00084         storage_.reset(StorageFactory::get()->open(name.c_str(),
00085                                                    IOFlags::OpenRead));
00086       }
00087       catch(cms::Exception& e) {
00088         Exception ex(errors::FileOpenError, "", e);
00089         ex.addContext("Calling StreamerInputFile::openStreamerFile()");
00090         ex.clearMessage();
00091         ex <<  "Error Opening Streamer Input File: " << name << "\n";
00092         throw ex;
00093       }
00094     } else {
00095       throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00096         << "Error Opening Streamer Input File, file does not exist: "
00097         << name << "\n";
00098     }
00099     currentFileOpen_ = true;
00100     logFileAction("  Successfully opened file ");
00101   }
00102 
00103   IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00104     IOSize n = 0;
00105     try {
00106       n = storage_->read(buf, nBytes);
00107     }
00108     catch(cms::Exception& ce) {
00109       Exception ex(errors::FileReadError, "", ce);
00110       ex.addContext("Calling StreamerInputFile::readBytes()");
00111       throw ex;
00112     }
00113     return n;
00114   }
00115 
00116   IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00117     IOOffset n = 0;
00118     try {
00119       // We wish to return the number of bytes skipped, not the final offset.
00120       n = storage_->position(0, Storage::CURRENT);
00121       n = storage_->position(nBytes, Storage::CURRENT) - n;
00122     }
00123     catch(cms::Exception& ce) {
00124       Exception ex(errors::FileReadError, "", ce);
00125       ex.addContext("Calling StreamerInputFile::skipBytes()");
00126       throw ex;
00127     }
00128     return n;
00129   }
00130 
00131   void StreamerInputFile::readStartMessage() {
00132     IOSize nWant = sizeof(HeaderView);
00133     IOSize nGot = readBytes(&headerBuf_[0], nWant);
00134     if(nGot != nWant) {
00135       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00136         << "Failed reading streamer file, first read in readStartMessage\n";
00137     }
00138 
00139     HeaderView head(&headerBuf_[0]);
00140     uint32 code = head.code();
00141     if(code != Header::INIT) 
00142     {
00143       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00144         << "Expecting an init Message at start of file\n";
00145       return;
00146     }
00147 
00148     uint32 headerSize = head.size();
00149     if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00150 
00151     if(headerSize > sizeof(HeaderView)) {
00152       nWant = headerSize - sizeof(HeaderView);
00153       nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00154       if(nGot != nWant) {
00155         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00156           << "Failed reading streamer file, second read in readStartMessage\n";
00157       }
00158     } else {
00159       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00160         << "Failed reading streamer file, init header size from data too small\n";
00161     }
00162 
00163     startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00164   }
00165 
00166   bool StreamerInputFile::next() {
00167     if(this->readEventMessage()) {
00168          return true;
00169     }
00170     if(multiStreams_) {
00171        //Try opening next file
00172        if(openNextFile()) {
00173           endOfFile_ = false;
00174           if(this->readEventMessage()) {
00175              return true;
00176           }
00177        }
00178     }
00179     return false;
00180   }
00181 
00182   bool StreamerInputFile::openNextFile() {
00183 
00184      if(currentFile_ <= streamerNames_.size() - 1) {
00185        FDEBUG(10) << "Opening file "
00186                   << streamerNames_.at(currentFile_).c_str() << std::endl;
00187 
00188        openStreamerFile(streamerNames_.at(currentFile_));
00189 
00190        // If start message was already there, then compare the
00191        // previous and new headers
00192        if(startMsg_) {
00193           FDEBUG(10) << "Comparing Header" << std::endl;
00194           if(!compareHeader()) {
00195               return false;
00196           }
00197        }
00198        ++currentFile_;
00199        return true;
00200      }
00201      return false;
00202   }
00203 
00204   bool StreamerInputFile::compareHeader() {
00205 
00206     //Get the new header
00207     readStartMessage();
00208 
00209     //Values from new Header should match up
00210     if(currRun_ != startMsg_->run() ||
00211         currProto_ != startMsg_->protocolVersion()) {
00212       throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00213         << "File " << streamerNames_.at(currentFile_)
00214         << "\nhas different run number or protocol version than previous\n";
00215       return false;
00216     }
00217     newHeader_ = true;
00218     return true;
00219   }
00220 
00221 
00222   int StreamerInputFile::readEventMessage() {
00223     if(endOfFile_) return 0;
00224 
00225     bool eventRead = false;
00226     while(!eventRead) {
00227 
00228       IOSize nWant = sizeof(EventHeader);
00229       IOSize nGot = readBytes(&eventBuf_[0], nWant);
00230       if(nGot != nWant) {
00231         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00232           << "Failed reading streamer file, first read in readEventMessage\n"
00233           << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00234       }
00235       HeaderView head(&eventBuf_[0]);
00236       uint32 code = head.code();
00237 
00238       // When we get the EOF record we know we have read all events
00239       // normally and are at the end, return 0 to indicate this
00240       if(code == Header::EOFRECORD) {
00241         endOfFile_ = true;
00242         return 0;
00243       }
00244       // If it is not an event nor EOFRECORD then something is wrong.
00245       if(code != Header::EVENT) {
00246         throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00247           << "Failed reading streamer file, unknown code in event header\n"
00248           << "code = " << code << "\n";
00249       }
00250       uint32 eventSize = head.size();
00251       if(eventSize <= sizeof(EventHeader)) {
00252         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00253           << "Failed reading streamer file, event header size from data too small\n";
00254       }
00255       eventRead = true;
00256       if(eventSkipperByID_) {
00257         EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00258         if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00259           eventRead = false;
00260         }
00261       }
00262       if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
00263         eventRead = false;
00264         --(*numberOfEventsToSkip_);
00265       }
00266       nWant = eventSize - sizeof(EventHeader);
00267       if(eventRead) {
00268         if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00269         nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00270         if(nGot != nWant) {
00271           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00272             << "Failed reading streamer file, second read in readEventMessage\n"
00273             << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00274         }
00275       } else {
00276         nGot = skipBytes(nWant);
00277         if(nGot != nWant) {
00278           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00279             << "Failed reading streamer file, skip event in readEventMessage\n"
00280             << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00281         }
00282       }
00283     }
00284     currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00285     return 1;
00286   }
00287 
00288   void StreamerInputFile::logFileAction(char const* msg) {
00289     LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00290     FlushMessageLog();
00291   }
00292 }