CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/IOPool/Streamer/src/StreamerInputFile.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002 
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 #include "FWCore/Sources/interface/EventSkipperByID.h"
00005 #include "FWCore/Utilities/interface/DebugMacros.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 #include "FWCore/Utilities/interface/Exception.h"
00008 #include "FWCore/Utilities/interface/TimeOfDay.h"
00009 
00010 #include "Utilities/StorageFactory/interface/IOFlags.h"
00011 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00012 
00013 #include <iomanip>
00014 #include <iostream>
00015 
00016 namespace edm {
00017 
00018   StreamerInputFile::~StreamerInputFile() {
00019     if(storage_) {
00020       storage_->close();
00021       if(currentFileOpen_) logFileAction("  Closed file ");
00022     }
00023   }
00024 
00025   StreamerInputFile::StreamerInputFile(std::string const& name,
00026                                        int* numberOfEventsToSkip,
00027                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00028     startMsg_(),
00029     currentEvMsg_(),
00030     headerBuf_(1000*1000),
00031     eventBuf_(1000*1000*7),
00032     currentFile_(0),
00033     streamerNames_(),
00034     multiStreams_(false),
00035     currentFileName_(),
00036     currentFileOpen_(false),
00037     eventSkipperByID_(eventSkipperByID),
00038     numberOfEventsToSkip_(numberOfEventsToSkip),
00039     currRun_(0),
00040     currProto_(0),
00041     newHeader_(false),
00042     storage_(),
00043     endOfFile_(false) {
00044     openStreamerFile(name);
00045     readStartMessage();
00046   }
00047 
00048   StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00049                                        int* numberOfEventsToSkip,
00050                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00051     startMsg_(),
00052     currentEvMsg_(),
00053     headerBuf_(1000*1000),
00054     eventBuf_(1000*1000*7),
00055     currentFile_(0),
00056     streamerNames_(names),
00057     multiStreams_(true),
00058     currentFileName_(),
00059     currentFileOpen_(false),
00060     eventSkipperByID_(eventSkipperByID),
00061     numberOfEventsToSkip_(numberOfEventsToSkip),
00062     currRun_(0),
00063     currProto_(0),
00064     newHeader_(false),
00065     endOfFile_(false) {
00066     openStreamerFile(names.at(0));
00067     ++currentFile_;
00068     readStartMessage();
00069     currRun_ = startMsg_->run();
00070     currProto_ = startMsg_->protocolVersion();
00071   }
00072 
00073   void
00074   StreamerInputFile::openStreamerFile(std::string const& name) {
00075 
00076     if(storage_) {
00077       storage_->close();
00078       if(currentFileOpen_) logFileAction("  Closed file ");
00079     }
00080 
00081     currentFileName_ = name;
00082     currentFileOpen_ = false;
00083     logFileAction("  Initiating request to open file ");
00084 
00085     IOOffset size = -1;
00086     if(StorageFactory::get()->check(name.c_str(), &size)) {
00087       try {
00088         storage_.reset(StorageFactory::get()->open(name.c_str(),
00089                                                    IOFlags::OpenRead));
00090       }
00091       catch(cms::Exception& e) {
00092         Exception ex(errors::FileOpenError, "", e);
00093         ex.addContext("Calling StreamerInputFile::openStreamerFile()");
00094         ex.clearMessage();
00095         ex <<  "Error Opening Streamer Input File: " << name << "\n";
00096         throw ex;
00097       }
00098     } else {
00099       throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00100         << "Error Opening Streamer Input File, file does not exist: "
00101         << name << "\n";
00102     }
00103     currentFileOpen_ = true;
00104     logFileAction("  Successfully opened file ");
00105   }
00106 
00107   IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00108     IOSize n = 0;
00109     try {
00110       n = storage_->read(buf, nBytes);
00111     }
00112     catch(cms::Exception& ce) {
00113       Exception ex(errors::FileReadError, "", ce);
00114       ex.addContext("Calling StreamerInputFile::readBytes()");
00115       throw ex;
00116     }
00117     return n;
00118   }
00119 
00120   IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00121     IOOffset n = 0;
00122     try {
00123       // We wish to return the number of bytes skipped, not the final offset.
00124       n = storage_->position(0, Storage::CURRENT);
00125       n = storage_->position(nBytes, Storage::CURRENT) - n;
00126     }
00127     catch(cms::Exception& ce) {
00128       Exception ex(errors::FileReadError, "", ce);
00129       ex.addContext("Calling StreamerInputFile::skipBytes()");
00130       throw ex;
00131     }
00132     return n;
00133   }
00134 
00135   void StreamerInputFile::readStartMessage() {
00136     IOSize nWant = sizeof(HeaderView);
00137     IOSize nGot = readBytes(&headerBuf_[0], nWant);
00138     if(nGot != nWant) {
00139       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00140         << "Failed reading streamer file, first read in readStartMessage\n";
00141     }
00142 
00143     HeaderView head(&headerBuf_[0]);
00144     uint32 code = head.code();
00145     if(code != Header::INIT) 
00146     {
00147       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00148         << "Expecting an init Message at start of file\n";
00149       return;
00150     }
00151 
00152     uint32 headerSize = head.size();
00153     if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00154 
00155     if(headerSize > sizeof(HeaderView)) {
00156       nWant = headerSize - sizeof(HeaderView);
00157       nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00158       if(nGot != nWant) {
00159         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00160           << "Failed reading streamer file, second read in readStartMessage\n";
00161       }
00162     } else {
00163       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00164         << "Failed reading streamer file, init header size from data too small\n";
00165     }
00166 
00167     startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00168   }
00169 
00170   bool StreamerInputFile::next() {
00171     if(this->readEventMessage()) {
00172          return true;
00173     }
00174     if(multiStreams_) {
00175        //Try opening next file
00176        if(openNextFile()) {
00177           endOfFile_ = false;
00178           if(this->readEventMessage()) {
00179              return true;
00180           }
00181        }
00182     }
00183     return false;
00184   }
00185 
00186   bool StreamerInputFile::openNextFile() {
00187 
00188      if(currentFile_ <= streamerNames_.size() - 1) {
00189        FDEBUG(10) << "Opening file "
00190                   << streamerNames_.at(currentFile_).c_str() << std::endl;
00191 
00192        openStreamerFile(streamerNames_.at(currentFile_));
00193 
00194        // If start message was already there, then compare the
00195        // previous and new headers
00196        if(startMsg_) {
00197           FDEBUG(10) << "Comparing Header" << std::endl;
00198           if(!compareHeader()) {
00199               return false;
00200           }
00201        }
00202        ++currentFile_;
00203        return true;
00204      }
00205      return false;
00206   }
00207 
00208   bool StreamerInputFile::compareHeader() {
00209 
00210     //Get the new header
00211     readStartMessage();
00212 
00213     //Values from new Header should match up
00214     if(currRun_ != startMsg_->run() ||
00215         currProto_ != startMsg_->protocolVersion()) {
00216       throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00217         << "File " << streamerNames_.at(currentFile_)
00218         << "\nhas different run number or protocol version than previous\n";
00219       return false;
00220     }
00221     newHeader_ = true;
00222     return true;
00223   }
00224 
00225 
00226   int StreamerInputFile::readEventMessage() {
00227     if(endOfFile_) return 0;
00228 
00229     bool eventRead = false;
00230     while(!eventRead) {
00231 
00232       IOSize nWant = sizeof(EventHeader);
00233       IOSize nGot = readBytes(&eventBuf_[0], nWant);
00234       if(nGot != nWant) {
00235         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00236           << "Failed reading streamer file, first read in readEventMessage\n"
00237           << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00238       }
00239       HeaderView head(&eventBuf_[0]);
00240       uint32 code = head.code();
00241 
00242       // When we get the EOF record we know we have read all events
00243       // normally and are at the end, return 0 to indicate this
00244       if(code == Header::EOFRECORD) {
00245         endOfFile_ = true;
00246         return 0;
00247       }
00248       // If it is not an event nor EOFRECORD then something is wrong.
00249       if(code != Header::EVENT) {
00250         throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00251           << "Failed reading streamer file, unknown code in event header\n"
00252           << "code = " << code << "\n";
00253       }
00254       uint32 eventSize = head.size();
00255       if(eventSize <= sizeof(EventHeader)) {
00256         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00257           << "Failed reading streamer file, event header size from data too small\n";
00258       }
00259       eventRead = true;
00260       if(eventSkipperByID_) {
00261         EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00262         if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00263           eventRead = false;
00264         }
00265       }
00266       if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
00267         eventRead = false;
00268         --(*numberOfEventsToSkip_);
00269       }
00270       nWant = eventSize - sizeof(EventHeader);
00271       if(eventRead) {
00272         if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00273         nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00274         if(nGot != nWant) {
00275           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00276             << "Failed reading streamer file, second read in readEventMessage\n"
00277             << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00278         }
00279       } else {
00280         nGot = skipBytes(nWant);
00281         if(nGot != nWant) {
00282           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00283             << "Failed reading streamer file, skip event in readEventMessage\n"
00284             << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00285         }
00286       }
00287     }
00288     currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00289     return 1;
00290   }
00291 
00292   void StreamerInputFile::logFileAction(char const* msg) {
00293     LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00294     FlushMessageLog();
00295   }
00296 }