00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 #include "FWCore/Sources/interface/EventSkipperByID.h"
00005 #include "FWCore/Utilities/interface/DebugMacros.h"
00006 #include "FWCore/Utilities/interface/EDMException.h"
00007 #include "FWCore/Utilities/interface/Exception.h"
00008 #include "FWCore/Utilities/interface/TimeOfDay.h"
00009
00010 #include "Utilities/StorageFactory/interface/IOFlags.h"
00011 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00012
00013 #include <iomanip>
00014 #include <iostream>
00015
00016 namespace edm {
00017
00018 StreamerInputFile::~StreamerInputFile() {
00019 if(storage_) {
00020 storage_->close();
00021 if(currentFileOpen_) logFileAction(" Closed file ");
00022 }
00023 }
00024
00025 StreamerInputFile::StreamerInputFile(std::string const& name,
00026 int* numberOfEventsToSkip,
00027 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00028 startMsg_(),
00029 currentEvMsg_(),
00030 headerBuf_(1000*1000),
00031 eventBuf_(1000*1000*7),
00032 multiStreams_(false),
00033 currentFileName_(),
00034 currentFileOpen_(false),
00035 eventSkipperByID_(eventSkipperByID),
00036 numberOfEventsToSkip_(numberOfEventsToSkip),
00037 newHeader_(false),
00038 endOfFile_(false) {
00039 openStreamerFile(name);
00040 readStartMessage();
00041 }
00042
00043
00044 StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00045 int* numberOfEventsToSkip,
00046 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00047 startMsg_(),
00048 currentEvMsg_(),
00049 headerBuf_(1000*1000),
00050 eventBuf_(1000*1000*7),
00051 currentFile_(0),
00052 streamerNames_(names),
00053 multiStreams_(true),
00054 currentFileName_(),
00055 currentFileOpen_(false),
00056 eventSkipperByID_(eventSkipperByID),
00057 numberOfEventsToSkip_(numberOfEventsToSkip),
00058 currRun_(0),
00059 currProto_(0),
00060 newHeader_(false),
00061 endOfFile_(false) {
00062 openStreamerFile(names.at(0));
00063 ++currentFile_;
00064 readStartMessage();
00065 currRun_ = startMsg_->run();
00066 currProto_ = startMsg_->protocolVersion();
00067 }
00068
00069 void
00070 StreamerInputFile::openStreamerFile(std::string const& name) {
00071
00072 if(storage_) {
00073 storage_->close();
00074 if(currentFileOpen_) logFileAction(" Closed file ");
00075 }
00076
00077 currentFileName_ = name;
00078 currentFileOpen_ = false;
00079 logFileAction(" Initiating request to open file ");
00080
00081 IOOffset size = -1;
00082 if(StorageFactory::get()->check(name.c_str(), &size)) {
00083 try {
00084 storage_.reset(StorageFactory::get()->open(name.c_str(),
00085 IOFlags::OpenRead));
00086 }
00087 catch(cms::Exception& e) {
00088 Exception ex(errors::FileOpenError, "", e);
00089 ex.addContext("Calling StreamerInputFile::openStreamerFile()");
00090 ex.clearMessage();
00091 ex << "Error Opening Streamer Input File: " << name << "\n";
00092 throw ex;
00093 }
00094 } else {
00095 throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00096 << "Error Opening Streamer Input File, file does not exist: "
00097 << name << "\n";
00098 }
00099 currentFileOpen_ = true;
00100 logFileAction(" Successfully opened file ");
00101 }
00102
00103 IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00104 IOSize n = 0;
00105 try {
00106 n = storage_->read(buf, nBytes);
00107 }
00108 catch(cms::Exception& ce) {
00109 Exception ex(errors::FileReadError, "", ce);
00110 ex.addContext("Calling StreamerInputFile::readBytes()");
00111 throw ex;
00112 }
00113 return n;
00114 }
00115
00116 IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00117 IOOffset n = 0;
00118 try {
00119
00120 n = storage_->position(0, Storage::CURRENT);
00121 n = storage_->position(nBytes, Storage::CURRENT) - n;
00122 }
00123 catch(cms::Exception& ce) {
00124 Exception ex(errors::FileReadError, "", ce);
00125 ex.addContext("Calling StreamerInputFile::skipBytes()");
00126 throw ex;
00127 }
00128 return n;
00129 }
00130
00131 void StreamerInputFile::readStartMessage() {
00132 IOSize nWant = sizeof(HeaderView);
00133 IOSize nGot = readBytes(&headerBuf_[0], nWant);
00134 if(nGot != nWant) {
00135 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00136 << "Failed reading streamer file, first read in readStartMessage\n";
00137 }
00138
00139 HeaderView head(&headerBuf_[0]);
00140 uint32 code = head.code();
00141 if(code != Header::INIT)
00142 {
00143 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00144 << "Expecting an init Message at start of file\n";
00145 return;
00146 }
00147
00148 uint32 headerSize = head.size();
00149 if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00150
00151 if(headerSize > sizeof(HeaderView)) {
00152 nWant = headerSize - sizeof(HeaderView);
00153 nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00154 if(nGot != nWant) {
00155 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00156 << "Failed reading streamer file, second read in readStartMessage\n";
00157 }
00158 } else {
00159 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00160 << "Failed reading streamer file, init header size from data too small\n";
00161 }
00162
00163 startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00164 }
00165
00166 bool StreamerInputFile::next() {
00167 if(this->readEventMessage()) {
00168 return true;
00169 }
00170 if(multiStreams_) {
00171
00172 if(openNextFile()) {
00173 endOfFile_ = false;
00174 if(this->readEventMessage()) {
00175 return true;
00176 }
00177 }
00178 }
00179 return false;
00180 }
00181
00182 bool StreamerInputFile::openNextFile() {
00183
00184 if(currentFile_ <= streamerNames_.size() - 1) {
00185 FDEBUG(10) << "Opening file "
00186 << streamerNames_.at(currentFile_).c_str() << std::endl;
00187
00188 openStreamerFile(streamerNames_.at(currentFile_));
00189
00190
00191
00192 if(startMsg_) {
00193 FDEBUG(10) << "Comparing Header" << std::endl;
00194 if(!compareHeader()) {
00195 return false;
00196 }
00197 }
00198 ++currentFile_;
00199 return true;
00200 }
00201 return false;
00202 }
00203
00204 bool StreamerInputFile::compareHeader() {
00205
00206
00207 readStartMessage();
00208
00209
00210 if(currRun_ != startMsg_->run() ||
00211 currProto_ != startMsg_->protocolVersion()) {
00212 throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00213 << "File " << streamerNames_.at(currentFile_)
00214 << "\nhas different run number or protocol version than previous\n";
00215 return false;
00216 }
00217 newHeader_ = true;
00218 return true;
00219 }
00220
00221
00222 int StreamerInputFile::readEventMessage() {
00223 if(endOfFile_) return 0;
00224
00225 bool eventRead = false;
00226 while(!eventRead) {
00227
00228 IOSize nWant = sizeof(EventHeader);
00229 IOSize nGot = readBytes(&eventBuf_[0], nWant);
00230 if(nGot != nWant) {
00231 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00232 << "Failed reading streamer file, first read in readEventMessage\n"
00233 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00234 }
00235 HeaderView head(&eventBuf_[0]);
00236 uint32 code = head.code();
00237
00238
00239
00240 if(code == Header::EOFRECORD) {
00241 endOfFile_ = true;
00242 return 0;
00243 }
00244
00245 if(code != Header::EVENT) {
00246 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00247 << "Failed reading streamer file, unknown code in event header\n"
00248 << "code = " << code << "\n";
00249 }
00250 uint32 eventSize = head.size();
00251 if(eventSize <= sizeof(EventHeader)) {
00252 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00253 << "Failed reading streamer file, event header size from data too small\n";
00254 }
00255 eventRead = true;
00256 if(eventSkipperByID_) {
00257 EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00258 if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00259 eventRead = false;
00260 }
00261 }
00262 if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
00263 eventRead = false;
00264 --(*numberOfEventsToSkip_);
00265 }
00266 nWant = eventSize - sizeof(EventHeader);
00267 if(eventRead) {
00268 if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00269 nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00270 if(nGot != nWant) {
00271 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00272 << "Failed reading streamer file, second read in readEventMessage\n"
00273 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00274 }
00275 } else {
00276 nGot = skipBytes(nWant);
00277 if(nGot != nWant) {
00278 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00279 << "Failed reading streamer file, skip event in readEventMessage\n"
00280 << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00281 }
00282 }
00283 }
00284 currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00285 return 1;
00286 }
00287
00288 void StreamerInputFile::logFileAction(char const* msg) {
00289 LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00290 FlushMessageLog();
00291 }
00292 }