CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/IOPool/Streamer/src/StreamerInputFile.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002 #include "FWCore/Utilities/interface/Exception.h"
00003 #include "FWCore/Utilities/interface/EDMException.h"
00004 #include "FWCore/Utilities/interface/DebugMacros.h"
00005 #include "FWCore/Utilities/interface/TimeOfDay.h"
00006 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00008 #include "FWCore/Sources/interface/EventSkipperByID.h"
00009 
00010 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00011 #include "Utilities/StorageFactory/interface/IOFlags.h"
00012 
00013 #include <iomanip>
00014 #include <iostream>
00015 
00016 namespace edm {
00017 
00018   StreamerInputFile::~StreamerInputFile() {
00019     if (storage_) {
00020       storage_->close();
00021       if (currentFileOpen_) logFileAction("  Closed file ");
00022     }
00023   }
00024 
00025   StreamerInputFile::StreamerInputFile(std::string const& name,
00026                                        int* numberOfEventsToSkip,
00027                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00028     startMsg_(),
00029     currentEvMsg_(),
00030     headerBuf_(1000*1000),
00031     eventBuf_(1000*1000*7),
00032     multiStreams_(false),
00033     currentFileName_(),
00034     currentFileOpen_(false),
00035     eventSkipperByID_(eventSkipperByID),
00036     numberOfEventsToSkip_(numberOfEventsToSkip),
00037     newHeader_(false),
00038     endOfFile_(false) {
00039     openStreamerFile(name);
00040     readStartMessage();
00041   }
00042 
00043 
00044   StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00045                                        int* numberOfEventsToSkip,
00046                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00047     startMsg_(),
00048     currentEvMsg_(),
00049     headerBuf_(1000*1000),
00050     eventBuf_(1000*1000*7),
00051     currentFile_(0),
00052     streamerNames_(names),
00053     multiStreams_(true),
00054     currentFileName_(),
00055     currentFileOpen_(false),
00056     eventSkipperByID_(eventSkipperByID),
00057     numberOfEventsToSkip_(numberOfEventsToSkip),
00058     currRun_(0),
00059     currProto_(0),
00060     newHeader_(false),
00061     endOfFile_(false) {
00062     openStreamerFile(names.at(0));
00063     ++currentFile_;
00064     readStartMessage();
00065     currRun_ = startMsg_->run();
00066     currProto_ = startMsg_->protocolVersion();
00067   }
00068 
00069   void
00070   StreamerInputFile::openStreamerFile(std::string const& name) {
00071 
00072     if (storage_) {
00073       storage_->close();
00074       if (currentFileOpen_) logFileAction("  Closed file ");
00075     }
00076 
00077     currentFileName_ = name;
00078     currentFileOpen_ = false;
00079     logFileAction("  Initiating request to open file ");
00080 
00081     IOOffset size = -1;
00082     if (StorageFactory::get()->check(name.c_str(), &size)) {
00083       try {
00084         storage_.reset(StorageFactory::get()->open(name.c_str(),
00085                                                    IOFlags::OpenRead));
00086       }
00087       catch (cms::Exception& e) {
00088         throw Exception(errors::FileOpenError,"StreamerInputFile::openStreamerFile")
00089           << "Error Opening Streamer Input File: " << name << "\n"
00090           << e.explainSelf() << "\n";
00091       }
00092     }
00093     else {
00094       throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00095         << "Error Opening Streamer Input File, file does not exist: "
00096         << name << "\n";
00097     }
00098     currentFileOpen_ = true;
00099     logFileAction("  Successfully opened file ");
00100   }
00101 
00102   IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00103     IOSize n = 0;
00104     try {
00105       n = storage_->read(buf, nBytes);
00106     }
00107     catch (cms::Exception& ce) {
00108       throw Exception(errors::FileReadError, "StreamerInputFile::readBytes")
00109         << "Failed reading streamer file in function readBytes\n"
00110         << ce.explainSelf() << "\n";
00111     }
00112     return n;
00113   }
00114 
00115   IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00116     IOOffset n = 0;
00117     try {
00118       // We wish to return the number of bytes skipped, not the final offset.
00119       n = storage_->position(0, Storage::CURRENT);
00120       n = storage_->position(nBytes, Storage::CURRENT) - n;
00121     }
00122     catch (cms::Exception& ce) {
00123       throw Exception(errors::FileReadError, "StreamerInputFile::skipBytes")
00124         << "Failed reading streamer file in function skipBytes\n"
00125         << ce.explainSelf() << "\n";
00126     }
00127     return n;
00128   }
00129 
00130 
00131   void StreamerInputFile::readStartMessage() {
00132     IOSize nWant = sizeof(HeaderView);
00133     IOSize nGot = readBytes(&headerBuf_[0], nWant);
00134     if (nGot != nWant) {
00135       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00136         << "Failed reading streamer file, first read in readStartMessage\n";
00137     }
00138 
00139     HeaderView head(&headerBuf_[0]);
00140     uint32 code = head.code();
00141     if (code != Header::INIT) 
00142     {
00143       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00144         << "Expecting an init Message at start of file\n";
00145       return;
00146     }
00147 
00148     uint32 headerSize = head.size();
00149     if (headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00150 
00151     if (headerSize > sizeof(HeaderView)) {
00152       nWant = headerSize - sizeof(HeaderView);
00153       nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00154       if (nGot != nWant) {
00155         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00156           << "Failed reading streamer file, second read in readStartMessage\n";
00157       }
00158     }
00159     else {
00160       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00161         << "Failed reading streamer file, init header size from data too small\n";
00162     }
00163 
00164     startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00165   }
00166 
00167   bool StreamerInputFile::next() {
00168     if (this->readEventMessage()) {
00169          return true;
00170     }
00171     if (multiStreams_) {
00172        //Try opening next file
00173        if (openNextFile()) {
00174           endOfFile_ = false;
00175           if (this->readEventMessage()) {
00176              return true;
00177           }
00178        }
00179     }
00180     return false;
00181   }
00182 
00183   bool StreamerInputFile::openNextFile() {
00184 
00185      if (currentFile_ <= streamerNames_.size() - 1) {
00186        FDEBUG(10) << "Opening file "
00187                   << streamerNames_.at(currentFile_).c_str() << std::endl;
00188 
00189        openStreamerFile(streamerNames_.at(currentFile_));
00190 
00191        // If start message was already there, then compare the
00192        // previous and new headers
00193        if (startMsg_) {
00194           FDEBUG(10) << "Comparing Header" << std::endl;
00195           if (!compareHeader()) {
00196               return false;
00197           }
00198        }
00199        ++currentFile_;
00200        return true;
00201      }
00202      return false;
00203   }
00204 
00205   bool StreamerInputFile::compareHeader() {
00206 
00207     //Get the new header
00208     readStartMessage();
00209 
00210     //Values from new Header should match up
00211     if (currRun_ != startMsg_->run() ||
00212         currProto_ != startMsg_->protocolVersion()) {
00213       throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00214         << "File " << streamerNames_.at(currentFile_)
00215         << "\nhas different run number or protocol version than previous\n";
00216       return false;
00217     }
00218     newHeader_ = true;
00219     return true;
00220   }
00221 
00222 
00223   int StreamerInputFile::readEventMessage() {
00224     if (endOfFile_) return 0;
00225 
00226     bool eventRead = false;
00227     while (!eventRead) {
00228 
00229       IOSize nWant = sizeof(EventHeader);
00230       IOSize nGot = readBytes(&eventBuf_[0], nWant);
00231       if (nGot != nWant) {
00232         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00233           << "Failed reading streamer file, first read in readEventMessage\n"
00234           << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00235       }
00236       HeaderView head(&eventBuf_[0]);
00237       uint32 code = head.code();
00238 
00239       // When we get the EOF record we know we have read all events
00240       // normally and are at the end, return 0 to indicate this
00241       if (code == Header::EOFRECORD) {
00242         endOfFile_ = true;
00243         return 0;
00244       }
00245       // If it is not an event nor EOFRECORD then something is wrong.
00246       if (code != Header::EVENT) {
00247         throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00248           << "Failed reading streamer file, unknown code in event header\n"
00249           << "code = " << code << "\n";
00250       }
00251       uint32 eventSize = head.size();
00252       if (eventSize <= sizeof(EventHeader)) {
00253         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00254           << "Failed reading streamer file, event header size from data too small\n";
00255       }
00256       eventRead = true;
00257       if (eventSkipperByID_) {
00258         EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00259         if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00260           eventRead = false;
00261         }
00262       }
00263       if (eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
00264         eventRead = false;
00265         --(*numberOfEventsToSkip_);
00266       }
00267       nWant = eventSize - sizeof(EventHeader);
00268       if (eventRead) {
00269         if (eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00270         nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00271         if (nGot != nWant) {
00272           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00273             << "Failed reading streamer file, second read in readEventMessage\n"
00274             << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00275         }
00276       } else {
00277         nGot = skipBytes(nWant);
00278         if (nGot != nWant) {
00279           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00280             << "Failed reading streamer file, skip event in readEventMessage\n"
00281             << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00282         }
00283       }
00284     }
00285     currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00286     return 1;
00287   }
00288 
00289   void StreamerInputFile::logFileAction(char const* msg) {
00290     LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00291     FlushMessageLog();
00292   }
00293 }