00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 #include "FWCore/Sources/interface/EventSkipperByID.h"
00005 #include "FWCore/Utilities/interface/DebugMacros.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 #include "FWCore/Utilities/interface/Exception.h"
00008 #include "FWCore/Utilities/interface/TimeOfDay.h"
00009
00010 #include "Utilities/StorageFactory/interface/IOFlags.h"
00011 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00012
00013 #include <iomanip>
00014 #include <iostream>
00015
00016 namespace edm {
00017
00018 StreamerInputFile::~StreamerInputFile() {
00019 if(storage_) {
00020 storage_->close();
00021 if(currentFileOpen_) logFileAction(" Closed file ");
00022 }
00023 }
00024
00025 StreamerInputFile::StreamerInputFile(std::string const& name,
00026 int* numberOfEventsToSkip,
00027 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00028 startMsg_(),
00029 currentEvMsg_(),
00030 headerBuf_(1000*1000),
00031 eventBuf_(1000*1000*7),
00032 currentFile_(0),
00033 streamerNames_(),
00034 multiStreams_(false),
00035 currentFileName_(),
00036 currentFileOpen_(false),
00037 eventSkipperByID_(eventSkipperByID),
00038 numberOfEventsToSkip_(numberOfEventsToSkip),
00039 currRun_(0),
00040 currProto_(0),
00041 newHeader_(false),
00042 storage_(),
00043 endOfFile_(false) {
00044 openStreamerFile(name);
00045 readStartMessage();
00046 }
00047
00048 StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00049 int* numberOfEventsToSkip,
00050 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00051 startMsg_(),
00052 currentEvMsg_(),
00053 headerBuf_(1000*1000),
00054 eventBuf_(1000*1000*7),
00055 currentFile_(0),
00056 streamerNames_(names),
00057 multiStreams_(true),
00058 currentFileName_(),
00059 currentFileOpen_(false),
00060 eventSkipperByID_(eventSkipperByID),
00061 numberOfEventsToSkip_(numberOfEventsToSkip),
00062 currRun_(0),
00063 currProto_(0),
00064 newHeader_(false),
00065 endOfFile_(false) {
00066 openStreamerFile(names.at(0));
00067 ++currentFile_;
00068 readStartMessage();
00069 currRun_ = startMsg_->run();
00070 currProto_ = startMsg_->protocolVersion();
00071 }
00072
00073 void
00074 StreamerInputFile::openStreamerFile(std::string const& name) {
00075
00076 if(storage_) {
00077 storage_->close();
00078 if(currentFileOpen_) logFileAction(" Closed file ");
00079 }
00080
00081 currentFileName_ = name;
00082 currentFileOpen_ = false;
00083 logFileAction(" Initiating request to open file ");
00084
00085 IOOffset size = -1;
00086 if(StorageFactory::get()->check(name.c_str(), &size)) {
00087 try {
00088 storage_.reset(StorageFactory::get()->open(name.c_str(),
00089 IOFlags::OpenRead));
00090 }
00091 catch(cms::Exception& e) {
00092 Exception ex(errors::FileOpenError, "", e);
00093 ex.addContext("Calling StreamerInputFile::openStreamerFile()");
00094 ex.clearMessage();
00095 ex << "Error Opening Streamer Input File: " << name << "\n";
00096 throw ex;
00097 }
00098 } else {
00099 throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00100 << "Error Opening Streamer Input File, file does not exist: "
00101 << name << "\n";
00102 }
00103 currentFileOpen_ = true;
00104 logFileAction(" Successfully opened file ");
00105 }
00106
00107 IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00108 IOSize n = 0;
00109 try {
00110 n = storage_->read(buf, nBytes);
00111 }
00112 catch(cms::Exception& ce) {
00113 Exception ex(errors::FileReadError, "", ce);
00114 ex.addContext("Calling StreamerInputFile::readBytes()");
00115 throw ex;
00116 }
00117 return n;
00118 }
00119
00120 IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00121 IOOffset n = 0;
00122 try {
00123
00124 n = storage_->position(0, Storage::CURRENT);
00125 n = storage_->position(nBytes, Storage::CURRENT) - n;
00126 }
00127 catch(cms::Exception& ce) {
00128 Exception ex(errors::FileReadError, "", ce);
00129 ex.addContext("Calling StreamerInputFile::skipBytes()");
00130 throw ex;
00131 }
00132 return n;
00133 }
00134
00135 void StreamerInputFile::readStartMessage() {
00136 IOSize nWant = sizeof(HeaderView);
00137 IOSize nGot = readBytes(&headerBuf_[0], nWant);
00138 if(nGot != nWant) {
00139 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00140 << "Failed reading streamer file, first read in readStartMessage\n";
00141 }
00142
00143 HeaderView head(&headerBuf_[0]);
00144 uint32 code = head.code();
00145 if(code != Header::INIT)
00146 {
00147 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00148 << "Expecting an init Message at start of file\n";
00149 return;
00150 }
00151
00152 uint32 headerSize = head.size();
00153 if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00154
00155 if(headerSize > sizeof(HeaderView)) {
00156 nWant = headerSize - sizeof(HeaderView);
00157 nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00158 if(nGot != nWant) {
00159 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00160 << "Failed reading streamer file, second read in readStartMessage\n";
00161 }
00162 } else {
00163 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00164 << "Failed reading streamer file, init header size from data too small\n";
00165 }
00166
00167 startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00168 }
00169
00170 bool StreamerInputFile::next() {
00171 if(this->readEventMessage()) {
00172 return true;
00173 }
00174 if(multiStreams_) {
00175
00176 if(openNextFile()) {
00177 endOfFile_ = false;
00178 if(this->readEventMessage()) {
00179 return true;
00180 }
00181 }
00182 }
00183 return false;
00184 }
00185
00186 bool StreamerInputFile::openNextFile() {
00187
00188 if(currentFile_ <= streamerNames_.size() - 1) {
00189 FDEBUG(10) << "Opening file "
00190 << streamerNames_.at(currentFile_).c_str() << std::endl;
00191
00192 openStreamerFile(streamerNames_.at(currentFile_));
00193
00194
00195
00196 if(startMsg_) {
00197 FDEBUG(10) << "Comparing Header" << std::endl;
00198 if(!compareHeader()) {
00199 return false;
00200 }
00201 }
00202 ++currentFile_;
00203 return true;
00204 }
00205 return false;
00206 }
00207
00208 bool StreamerInputFile::compareHeader() {
00209
00210
00211 readStartMessage();
00212
00213
00214 if(currRun_ != startMsg_->run() ||
00215 currProto_ != startMsg_->protocolVersion()) {
00216 throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00217 << "File " << streamerNames_.at(currentFile_)
00218 << "\nhas different run number or protocol version than previous\n";
00219 return false;
00220 }
00221 newHeader_ = true;
00222 return true;
00223 }
00224
00225
00226 int StreamerInputFile::readEventMessage() {
00227 if(endOfFile_) return 0;
00228
00229 bool eventRead = false;
00230 while(!eventRead) {
00231
00232 IOSize nWant = sizeof(EventHeader);
00233 IOSize nGot = readBytes(&eventBuf_[0], nWant);
00234 if(nGot != nWant) {
00235 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00236 << "Failed reading streamer file, first read in readEventMessage\n"
00237 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00238 }
00239 HeaderView head(&eventBuf_[0]);
00240 uint32 code = head.code();
00241
00242
00243
00244 if(code == Header::EOFRECORD) {
00245 endOfFile_ = true;
00246 return 0;
00247 }
00248
00249 if(code != Header::EVENT) {
00250 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00251 << "Failed reading streamer file, unknown code in event header\n"
00252 << "code = " << code << "\n";
00253 }
00254 uint32 eventSize = head.size();
00255 if(eventSize <= sizeof(EventHeader)) {
00256 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00257 << "Failed reading streamer file, event header size from data too small\n";
00258 }
00259 eventRead = true;
00260 if(eventSkipperByID_) {
00261 EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00262 if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00263 eventRead = false;
00264 }
00265 }
00266 if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
00267 eventRead = false;
00268 --(*numberOfEventsToSkip_);
00269 }
00270 nWant = eventSize - sizeof(EventHeader);
00271 if(eventRead) {
00272 if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00273 nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00274 if(nGot != nWant) {
00275 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00276 << "Failed reading streamer file, second read in readEventMessage\n"
00277 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00278 }
00279 } else {
00280 nGot = skipBytes(nWant);
00281 if(nGot != nWant) {
00282 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00283 << "Failed reading streamer file, skip event in readEventMessage\n"
00284 << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00285 }
00286 }
00287 }
00288 currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00289 return 1;
00290 }
00291
00292 void StreamerInputFile::logFileAction(char const* msg) {
00293 LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00294 FlushMessageLog();
00295 }
00296 }