CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC2/src/IOPool/Streamer/src/StreamerInputFile.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002 
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 #include "FWCore/Sources/interface/EventSkipperByID.h"
00005 #include "FWCore/Utilities/interface/DebugMacros.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 #include "FWCore/Utilities/interface/Exception.h"
00008 #include "FWCore/Utilities/interface/TimeOfDay.h"
00009 
00010 #include "Utilities/StorageFactory/interface/IOFlags.h"
00011 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00012 
00013 #include <iomanip>
00014 #include <iostream>
00015 
00016 namespace edm {
00017 
00018   StreamerInputFile::~StreamerInputFile() {
00019     closeStreamerFile();
00020   }
00021 
00022   StreamerInputFile::StreamerInputFile(std::string const& name,
00023                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00024     startMsg_(),
00025     currentEvMsg_(),
00026     headerBuf_(1000*1000),
00027     eventBuf_(1000*1000*7),
00028     currentFile_(0),
00029     streamerNames_(),
00030     multiStreams_(false),
00031     currentFileName_(),
00032     currentFileOpen_(false),
00033     eventSkipperByID_(eventSkipperByID),
00034     currRun_(0),
00035     currProto_(0),
00036     newHeader_(false),
00037     storage_(),
00038     endOfFile_(false) {
00039     openStreamerFile(name);
00040     readStartMessage();
00041   }
00042 
00043   StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00044                                        boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00045     startMsg_(),
00046     currentEvMsg_(),
00047     headerBuf_(1000*1000),
00048     eventBuf_(1000*1000*7),
00049     currentFile_(0),
00050     streamerNames_(names),
00051     multiStreams_(true),
00052     currentFileName_(),
00053     currentFileOpen_(false),
00054     eventSkipperByID_(eventSkipperByID),
00055     currRun_(0),
00056     currProto_(0),
00057     newHeader_(false),
00058     endOfFile_(false) {
00059     openStreamerFile(names.at(0));
00060     ++currentFile_;
00061     readStartMessage();
00062     currRun_ = startMsg_->run();
00063     currProto_ = startMsg_->protocolVersion();
00064   }
00065 
00066   void
00067   StreamerInputFile::openStreamerFile(std::string const& name) {
00068 
00069     closeStreamerFile();
00070 
00071     currentFileName_ = name;
00072     logFileAction("  Initiating request to open file ");
00073 
00074     IOOffset size = -1;
00075     if(StorageFactory::get()->check(name.c_str(), &size)) {
00076       try {
00077         storage_.reset(StorageFactory::get()->open(name.c_str(),
00078                                                    IOFlags::OpenRead));
00079       }
00080       catch(cms::Exception& e) {
00081         Exception ex(errors::FileOpenError, "", e);
00082         ex.addContext("Calling StreamerInputFile::openStreamerFile()");
00083         ex.clearMessage();
00084         ex <<  "Error Opening Streamer Input File: " << name << "\n";
00085         throw ex;
00086       }
00087     } else {
00088       throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00089         << "Error Opening Streamer Input File, file does not exist: "
00090         << name << "\n";
00091     }
00092     currentFileOpen_ = true;
00093     logFileAction("  Successfully opened file ");
00094   }
00095 
00096   void
00097   StreamerInputFile::closeStreamerFile() {
00098     if(currentFileOpen_ && storage_) {
00099       storage_->close();
00100       logFileAction("  Closed file ");
00101     }
00102     currentFileOpen_ = false;
00103   }
00104 
00105   IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00106     IOSize n = 0;
00107     try {
00108       n = storage_->read(buf, nBytes);
00109     }
00110     catch(cms::Exception& ce) {
00111       Exception ex(errors::FileReadError, "", ce);
00112       ex.addContext("Calling StreamerInputFile::readBytes()");
00113       throw ex;
00114     }
00115     return n;
00116   }
00117 
00118   IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00119     IOOffset n = 0;
00120     try {
00121       // We wish to return the number of bytes skipped, not the final offset.
00122       n = storage_->position(0, Storage::CURRENT);
00123       n = storage_->position(nBytes, Storage::CURRENT) - n;
00124     }
00125     catch(cms::Exception& ce) {
00126       Exception ex(errors::FileReadError, "", ce);
00127       ex.addContext("Calling StreamerInputFile::skipBytes()");
00128       throw ex;
00129     }
00130     return n;
00131   }
00132 
00133   void StreamerInputFile::readStartMessage() {
00134     IOSize nWant = sizeof(HeaderView);
00135     IOSize nGot = readBytes(&headerBuf_[0], nWant);
00136     if(nGot != nWant) {
00137       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00138         << "Failed reading streamer file, first read in readStartMessage\n";
00139     }
00140 
00141     HeaderView head(&headerBuf_[0]);
00142     uint32 code = head.code();
00143     if(code != Header::INIT) 
00144     {
00145       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00146         << "Expecting an init Message at start of file\n";
00147       return;
00148     }
00149 
00150     uint32 headerSize = head.size();
00151     if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00152 
00153     if(headerSize > sizeof(HeaderView)) {
00154       nWant = headerSize - sizeof(HeaderView);
00155       nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00156       if(nGot != nWant) {
00157         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00158           << "Failed reading streamer file, second read in readStartMessage\n";
00159       }
00160     } else {
00161       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00162         << "Failed reading streamer file, init header size from data too small\n";
00163     }
00164 
00165     startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00166   }
00167 
00168   bool StreamerInputFile::next() {
00169     if(this->readEventMessage()) {
00170          return true;
00171     }
00172     if(multiStreams_) {
00173        //Try opening next file
00174        if(openNextFile()) {
00175           endOfFile_ = false;
00176           if(this->readEventMessage()) {
00177              return true;
00178           }
00179        }
00180     }
00181     return false;
00182   }
00183 
00184   bool StreamerInputFile::openNextFile() {
00185 
00186      if(currentFile_ <= streamerNames_.size() - 1) {
00187        FDEBUG(10) << "Opening file "
00188                   << streamerNames_.at(currentFile_).c_str() << std::endl;
00189 
00190        openStreamerFile(streamerNames_.at(currentFile_));
00191 
00192        // If start message was already there, then compare the
00193        // previous and new headers
00194        if(startMsg_) {
00195           FDEBUG(10) << "Comparing Header" << std::endl;
00196           if(!compareHeader()) {
00197               return false;
00198           }
00199        }
00200        ++currentFile_;
00201        return true;
00202      }
00203      return false;
00204   }
00205 
00206   bool StreamerInputFile::compareHeader() {
00207 
00208     //Get the new header
00209     readStartMessage();
00210 
00211     //Values from new Header should match up
00212     if(currRun_ != startMsg_->run() ||
00213         currProto_ != startMsg_->protocolVersion()) {
00214       throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00215         << "File " << streamerNames_.at(currentFile_)
00216         << "\nhas different run number or protocol version than previous\n";
00217       return false;
00218     }
00219     newHeader_ = true;
00220     return true;
00221   }
00222 
00223 
00224   int StreamerInputFile::readEventMessage() {
00225     if(endOfFile_) return 0;
00226 
00227     bool eventRead = false;
00228     while(!eventRead) {
00229 
00230       IOSize nWant = sizeof(EventHeader);
00231       IOSize nGot = readBytes(&eventBuf_[0], nWant);
00232       if(nGot != nWant) {
00233         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00234           << "Failed reading streamer file, first read in readEventMessage\n"
00235           << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00236       }
00237       HeaderView head(&eventBuf_[0]);
00238       uint32 code = head.code();
00239 
00240       // When we get the EOF record we know we have read all events
00241       // normally and are at the end, return 0 to indicate this
00242       if(code == Header::EOFRECORD) {
00243         endOfFile_ = true;
00244         return 0;
00245       }
00246       // If it is not an event nor EOFRECORD then something is wrong.
00247       if(code != Header::EVENT) {
00248         throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00249           << "Failed reading streamer file, unknown code in event header\n"
00250           << "code = " << code << "\n";
00251       }
00252       uint32 eventSize = head.size();
00253       if(eventSize <= sizeof(EventHeader)) {
00254         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00255           << "Failed reading streamer file, event header size from data too small\n";
00256       }
00257       eventRead = true;
00258       if(eventSkipperByID_) {
00259         EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00260         if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00261           eventRead = false;
00262         }
00263       }
00264       nWant = eventSize - sizeof(EventHeader);
00265       if(eventRead) {
00266         if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00267         nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00268         if(nGot != nWant) {
00269           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00270             << "Failed reading streamer file, second read in readEventMessage\n"
00271             << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00272         }
00273       } else {
00274         nGot = skipBytes(nWant);
00275         if(nGot != nWant) {
00276           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00277             << "Failed reading streamer file, skip event in readEventMessage\n"
00278             << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00279         }
00280       }
00281     }
00282     currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00283     return 1;
00284   }
00285 
00286   bool StreamerInputFile::eofRecordMessage(uint32 const& hlt_path_cnt, EOFRecordView*& view) {
00287     if(!endOfFile_) return false;
00288 
00289     HeaderView head(&eventBuf_[0]);
00290     uint32 code = head.code();
00291     
00292     if(code != Header::EOFRECORD) {
00293       return false;
00294     }
00295     
00296     uint32 eofSize = head.size();
00297     IOSize nWant = eofSize - sizeof(EventHeader);
00298     if(nWant>0) {
00299       IOSize nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00300       if(nGot != nWant) {
00301           throw Exception(errors::FileReadError, "StreamerInputFile::eofRecordMessage")
00302             << "Failed reading streamer file, second read in eofRecordMessage\n"
00303             << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00304       }
00305     }
00306 
00307     view = new EOFRecordView(&eventBuf_[0], hlt_path_cnt);
00308     return true;
00309   }
00310 
00311   void StreamerInputFile::logFileAction(char const* msg) {
00312     LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00313     FlushMessageLog();
00314   }
00315 }