00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 #include "FWCore/Sources/interface/EventSkipperByID.h"
00005 #include "FWCore/Utilities/interface/DebugMacros.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 #include "FWCore/Utilities/interface/Exception.h"
00008 #include "FWCore/Utilities/interface/TimeOfDay.h"
00009
00010 #include "Utilities/StorageFactory/interface/IOFlags.h"
00011 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00012
00013 #include <iomanip>
00014 #include <iostream>
00015
00016 namespace edm {
00017
00018 StreamerInputFile::~StreamerInputFile() {
00019 closeStreamerFile();
00020 }
00021
00022 StreamerInputFile::StreamerInputFile(std::string const& name,
00023 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00024 startMsg_(),
00025 currentEvMsg_(),
00026 headerBuf_(1000*1000),
00027 eventBuf_(1000*1000*7),
00028 currentFile_(0),
00029 streamerNames_(),
00030 multiStreams_(false),
00031 currentFileName_(),
00032 currentFileOpen_(false),
00033 eventSkipperByID_(eventSkipperByID),
00034 currRun_(0),
00035 currProto_(0),
00036 newHeader_(false),
00037 storage_(),
00038 endOfFile_(false) {
00039 openStreamerFile(name);
00040 readStartMessage();
00041 }
00042
00043 StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00044 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00045 startMsg_(),
00046 currentEvMsg_(),
00047 headerBuf_(1000*1000),
00048 eventBuf_(1000*1000*7),
00049 currentFile_(0),
00050 streamerNames_(names),
00051 multiStreams_(true),
00052 currentFileName_(),
00053 currentFileOpen_(false),
00054 eventSkipperByID_(eventSkipperByID),
00055 currRun_(0),
00056 currProto_(0),
00057 newHeader_(false),
00058 endOfFile_(false) {
00059 openStreamerFile(names.at(0));
00060 ++currentFile_;
00061 readStartMessage();
00062 currRun_ = startMsg_->run();
00063 currProto_ = startMsg_->protocolVersion();
00064 }
00065
00066 void
00067 StreamerInputFile::openStreamerFile(std::string const& name) {
00068
00069 closeStreamerFile();
00070
00071 currentFileName_ = name;
00072 logFileAction(" Initiating request to open file ");
00073
00074 IOOffset size = -1;
00075 if(StorageFactory::get()->check(name.c_str(), &size)) {
00076 try {
00077 storage_.reset(StorageFactory::get()->open(name.c_str(),
00078 IOFlags::OpenRead));
00079 }
00080 catch(cms::Exception& e) {
00081 Exception ex(errors::FileOpenError, "", e);
00082 ex.addContext("Calling StreamerInputFile::openStreamerFile()");
00083 ex.clearMessage();
00084 ex << "Error Opening Streamer Input File: " << name << "\n";
00085 throw ex;
00086 }
00087 } else {
00088 throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00089 << "Error Opening Streamer Input File, file does not exist: "
00090 << name << "\n";
00091 }
00092 currentFileOpen_ = true;
00093 logFileAction(" Successfully opened file ");
00094 }
00095
00096 void
00097 StreamerInputFile::closeStreamerFile() {
00098 if(currentFileOpen_ && storage_) {
00099 storage_->close();
00100 logFileAction(" Closed file ");
00101 }
00102 currentFileOpen_ = false;
00103 }
00104
00105 IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00106 IOSize n = 0;
00107 try {
00108 n = storage_->read(buf, nBytes);
00109 }
00110 catch(cms::Exception& ce) {
00111 Exception ex(errors::FileReadError, "", ce);
00112 ex.addContext("Calling StreamerInputFile::readBytes()");
00113 throw ex;
00114 }
00115 return n;
00116 }
00117
00118 IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00119 IOOffset n = 0;
00120 try {
00121
00122 n = storage_->position(0, Storage::CURRENT);
00123 n = storage_->position(nBytes, Storage::CURRENT) - n;
00124 }
00125 catch(cms::Exception& ce) {
00126 Exception ex(errors::FileReadError, "", ce);
00127 ex.addContext("Calling StreamerInputFile::skipBytes()");
00128 throw ex;
00129 }
00130 return n;
00131 }
00132
00133 void StreamerInputFile::readStartMessage() {
00134 IOSize nWant = sizeof(HeaderView);
00135 IOSize nGot = readBytes(&headerBuf_[0], nWant);
00136 if(nGot != nWant) {
00137 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00138 << "Failed reading streamer file, first read in readStartMessage\n";
00139 }
00140
00141 HeaderView head(&headerBuf_[0]);
00142 uint32 code = head.code();
00143 if(code != Header::INIT)
00144 {
00145 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00146 << "Expecting an init Message at start of file\n";
00147 return;
00148 }
00149
00150 uint32 headerSize = head.size();
00151 if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00152
00153 if(headerSize > sizeof(HeaderView)) {
00154 nWant = headerSize - sizeof(HeaderView);
00155 nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00156 if(nGot != nWant) {
00157 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00158 << "Failed reading streamer file, second read in readStartMessage\n";
00159 }
00160 } else {
00161 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00162 << "Failed reading streamer file, init header size from data too small\n";
00163 }
00164
00165 startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00166 }
00167
00168 bool StreamerInputFile::next() {
00169 if(this->readEventMessage()) {
00170 return true;
00171 }
00172 if(multiStreams_) {
00173
00174 if(openNextFile()) {
00175 endOfFile_ = false;
00176 if(this->readEventMessage()) {
00177 return true;
00178 }
00179 }
00180 }
00181 return false;
00182 }
00183
00184 bool StreamerInputFile::openNextFile() {
00185
00186 if(currentFile_ <= streamerNames_.size() - 1) {
00187 FDEBUG(10) << "Opening file "
00188 << streamerNames_.at(currentFile_).c_str() << std::endl;
00189
00190 openStreamerFile(streamerNames_.at(currentFile_));
00191
00192
00193
00194 if(startMsg_) {
00195 FDEBUG(10) << "Comparing Header" << std::endl;
00196 if(!compareHeader()) {
00197 return false;
00198 }
00199 }
00200 ++currentFile_;
00201 return true;
00202 }
00203 return false;
00204 }
00205
00206 bool StreamerInputFile::compareHeader() {
00207
00208
00209 readStartMessage();
00210
00211
00212 if(currRun_ != startMsg_->run() ||
00213 currProto_ != startMsg_->protocolVersion()) {
00214 throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00215 << "File " << streamerNames_.at(currentFile_)
00216 << "\nhas different run number or protocol version than previous\n";
00217 return false;
00218 }
00219 newHeader_ = true;
00220 return true;
00221 }
00222
00223
00224 int StreamerInputFile::readEventMessage() {
00225 if(endOfFile_) return 0;
00226
00227 bool eventRead = false;
00228 while(!eventRead) {
00229
00230 IOSize nWant = sizeof(EventHeader);
00231 IOSize nGot = readBytes(&eventBuf_[0], nWant);
00232 if(nGot != nWant) {
00233 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00234 << "Failed reading streamer file, first read in readEventMessage\n"
00235 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00236 }
00237 HeaderView head(&eventBuf_[0]);
00238 uint32 code = head.code();
00239
00240
00241
00242 if(code == Header::EOFRECORD) {
00243 endOfFile_ = true;
00244 return 0;
00245 }
00246
00247 if(code != Header::EVENT) {
00248 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00249 << "Failed reading streamer file, unknown code in event header\n"
00250 << "code = " << code << "\n";
00251 }
00252 uint32 eventSize = head.size();
00253 if(eventSize <= sizeof(EventHeader)) {
00254 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00255 << "Failed reading streamer file, event header size from data too small\n";
00256 }
00257 eventRead = true;
00258 if(eventSkipperByID_) {
00259 EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00260 if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00261 eventRead = false;
00262 }
00263 }
00264 nWant = eventSize - sizeof(EventHeader);
00265 if(eventRead) {
00266 if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00267 nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00268 if(nGot != nWant) {
00269 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00270 << "Failed reading streamer file, second read in readEventMessage\n"
00271 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00272 }
00273 } else {
00274 nGot = skipBytes(nWant);
00275 if(nGot != nWant) {
00276 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00277 << "Failed reading streamer file, skip event in readEventMessage\n"
00278 << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00279 }
00280 }
00281 }
00282 currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00283 return 1;
00284 }
00285
00286 bool StreamerInputFile::eofRecordMessage(uint32 const& hlt_path_cnt, EOFRecordView*& view) {
00287 if(!endOfFile_) return false;
00288
00289 HeaderView head(&eventBuf_[0]);
00290 uint32 code = head.code();
00291
00292 if(code != Header::EOFRECORD) {
00293 return false;
00294 }
00295
00296 uint32 eofSize = head.size();
00297 IOSize nWant = eofSize - sizeof(EventHeader);
00298 if(nWant>0) {
00299 IOSize nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00300 if(nGot != nWant) {
00301 throw Exception(errors::FileReadError, "StreamerInputFile::eofRecordMessage")
00302 << "Failed reading streamer file, second read in eofRecordMessage\n"
00303 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00304 }
00305 }
00306
00307 view = new EOFRecordView(&eventBuf_[0], hlt_path_cnt);
00308 return true;
00309 }
00310
00311 void StreamerInputFile::logFileAction(char const* msg) {
00312 LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00313 FlushMessageLog();
00314 }
00315 }