00001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00002 #include "FWCore/Utilities/interface/Exception.h"
00003 #include "FWCore/Utilities/interface/EDMException.h"
00004 #include "FWCore/Utilities/interface/DebugMacros.h"
00005 #include "FWCore/Utilities/interface/TimeOfDay.h"
00006 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00008 #include "FWCore/Sources/interface/EventSkipperByID.h"
00009
00010 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00011 #include "Utilities/StorageFactory/interface/IOFlags.h"
00012
00013 #include <iomanip>
00014 #include <iostream>
00015
00016 namespace edm {
00017
00018 StreamerInputFile::~StreamerInputFile() {
00019 if (storage_) {
00020 storage_->close();
00021 if (currentFileOpen_) logFileAction(" Closed file ");
00022 }
00023 }
00024
00025 StreamerInputFile::StreamerInputFile(std::string const& name,
00026 int* numberOfEventsToSkip,
00027 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00028 startMsg_(),
00029 currentEvMsg_(),
00030 headerBuf_(1000*1000),
00031 eventBuf_(1000*1000*7),
00032 multiStreams_(false),
00033 currentFileName_(),
00034 currentFileOpen_(false),
00035 eventSkipperByID_(eventSkipperByID),
00036 numberOfEventsToSkip_(numberOfEventsToSkip),
00037 newHeader_(false),
00038 endOfFile_(false) {
00039 openStreamerFile(name);
00040 readStartMessage();
00041 }
00042
00043
00044 StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
00045 int* numberOfEventsToSkip,
00046 boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
00047 startMsg_(),
00048 currentEvMsg_(),
00049 headerBuf_(1000*1000),
00050 eventBuf_(1000*1000*7),
00051 currentFile_(0),
00052 streamerNames_(names),
00053 multiStreams_(true),
00054 currentFileName_(),
00055 currentFileOpen_(false),
00056 eventSkipperByID_(eventSkipperByID),
00057 numberOfEventsToSkip_(numberOfEventsToSkip),
00058 currRun_(0),
00059 currProto_(0),
00060 newHeader_(false),
00061 endOfFile_(false) {
00062 openStreamerFile(names.at(0));
00063 ++currentFile_;
00064 readStartMessage();
00065 currRun_ = startMsg_->run();
00066 currProto_ = startMsg_->protocolVersion();
00067 }
00068
00069 void
00070 StreamerInputFile::openStreamerFile(std::string const& name) {
00071
00072 if (storage_) {
00073 storage_->close();
00074 if (currentFileOpen_) logFileAction(" Closed file ");
00075 }
00076
00077 currentFileName_ = name;
00078 currentFileOpen_ = false;
00079 logFileAction(" Initiating request to open file ");
00080
00081 IOOffset size = -1;
00082 if (StorageFactory::get()->check(name.c_str(), &size)) {
00083 try {
00084 storage_.reset(StorageFactory::get()->open(name.c_str(),
00085 IOFlags::OpenRead));
00086 }
00087 catch (cms::Exception& e) {
00088 throw Exception(errors::FileOpenError,"StreamerInputFile::openStreamerFile")
00089 << "Error Opening Streamer Input File: " << name << "\n"
00090 << e.explainSelf() << "\n";
00091 }
00092 }
00093 else {
00094 throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
00095 << "Error Opening Streamer Input File, file does not exist: "
00096 << name << "\n";
00097 }
00098 currentFileOpen_ = true;
00099 logFileAction(" Successfully opened file ");
00100 }
00101
00102 IOSize StreamerInputFile::readBytes(char *buf, IOSize nBytes) {
00103 IOSize n = 0;
00104 try {
00105 n = storage_->read(buf, nBytes);
00106 }
00107 catch (cms::Exception& ce) {
00108 throw Exception(errors::FileReadError, "StreamerInputFile::readBytes")
00109 << "Failed reading streamer file in function readBytes\n"
00110 << ce.explainSelf() << "\n";
00111 }
00112 return n;
00113 }
00114
00115 IOOffset StreamerInputFile::skipBytes(IOSize nBytes) {
00116 IOOffset n = 0;
00117 try {
00118
00119 n = storage_->position(0, Storage::CURRENT);
00120 n = storage_->position(nBytes, Storage::CURRENT) - n;
00121 }
00122 catch (cms::Exception& ce) {
00123 throw Exception(errors::FileReadError, "StreamerInputFile::skipBytes")
00124 << "Failed reading streamer file in function skipBytes\n"
00125 << ce.explainSelf() << "\n";
00126 }
00127 return n;
00128 }
00129
00130
00131 void StreamerInputFile::readStartMessage() {
00132 IOSize nWant = sizeof(HeaderView);
00133 IOSize nGot = readBytes(&headerBuf_[0], nWant);
00134 if (nGot != nWant) {
00135 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00136 << "Failed reading streamer file, first read in readStartMessage\n";
00137 }
00138
00139 HeaderView head(&headerBuf_[0]);
00140 uint32 code = head.code();
00141 if (code != Header::INIT)
00142 {
00143 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00144 << "Expecting an init Message at start of file\n";
00145 return;
00146 }
00147
00148 uint32 headerSize = head.size();
00149 if (headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
00150
00151 if (headerSize > sizeof(HeaderView)) {
00152 nWant = headerSize - sizeof(HeaderView);
00153 nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
00154 if (nGot != nWant) {
00155 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00156 << "Failed reading streamer file, second read in readStartMessage\n";
00157 }
00158 }
00159 else {
00160 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
00161 << "Failed reading streamer file, init header size from data too small\n";
00162 }
00163
00164 startMsg_.reset(new InitMsgView(&headerBuf_[0]));
00165 }
00166
00167 bool StreamerInputFile::next() {
00168 if (this->readEventMessage()) {
00169 return true;
00170 }
00171 if (multiStreams_) {
00172
00173 if (openNextFile()) {
00174 endOfFile_ = false;
00175 if (this->readEventMessage()) {
00176 return true;
00177 }
00178 }
00179 }
00180 return false;
00181 }
00182
00183 bool StreamerInputFile::openNextFile() {
00184
00185 if (currentFile_ <= streamerNames_.size() - 1) {
00186 FDEBUG(10) << "Opening file "
00187 << streamerNames_.at(currentFile_).c_str() << std::endl;
00188
00189 openStreamerFile(streamerNames_.at(currentFile_));
00190
00191
00192
00193 if (startMsg_) {
00194 FDEBUG(10) << "Comparing Header" << std::endl;
00195 if (!compareHeader()) {
00196 return false;
00197 }
00198 }
00199 ++currentFile_;
00200 return true;
00201 }
00202 return false;
00203 }
00204
00205 bool StreamerInputFile::compareHeader() {
00206
00207
00208 readStartMessage();
00209
00210
00211 if (currRun_ != startMsg_->run() ||
00212 currProto_ != startMsg_->protocolVersion()) {
00213 throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
00214 << "File " << streamerNames_.at(currentFile_)
00215 << "\nhas different run number or protocol version than previous\n";
00216 return false;
00217 }
00218 newHeader_ = true;
00219 return true;
00220 }
00221
00222
00223 int StreamerInputFile::readEventMessage() {
00224 if (endOfFile_) return 0;
00225
00226 bool eventRead = false;
00227 while (!eventRead) {
00228
00229 IOSize nWant = sizeof(EventHeader);
00230 IOSize nGot = readBytes(&eventBuf_[0], nWant);
00231 if (nGot != nWant) {
00232 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00233 << "Failed reading streamer file, first read in readEventMessage\n"
00234 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00235 }
00236 HeaderView head(&eventBuf_[0]);
00237 uint32 code = head.code();
00238
00239
00240
00241 if (code == Header::EOFRECORD) {
00242 endOfFile_ = true;
00243 return 0;
00244 }
00245
00246 if (code != Header::EVENT) {
00247 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00248 << "Failed reading streamer file, unknown code in event header\n"
00249 << "code = " << code << "\n";
00250 }
00251 uint32 eventSize = head.size();
00252 if (eventSize <= sizeof(EventHeader)) {
00253 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00254 << "Failed reading streamer file, event header size from data too small\n";
00255 }
00256 eventRead = true;
00257 if (eventSkipperByID_) {
00258 EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
00259 if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
00260 eventRead = false;
00261 }
00262 }
00263 if (eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
00264 eventRead = false;
00265 --(*numberOfEventsToSkip_);
00266 }
00267 nWant = eventSize - sizeof(EventHeader);
00268 if (eventRead) {
00269 if (eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
00270 nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
00271 if (nGot != nWant) {
00272 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00273 << "Failed reading streamer file, second read in readEventMessage\n"
00274 << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
00275 }
00276 } else {
00277 nGot = skipBytes(nWant);
00278 if (nGot != nWant) {
00279 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
00280 << "Failed reading streamer file, skip event in readEventMessage\n"
00281 << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
00282 }
00283 }
00284 }
00285 currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
00286 return 1;
00287 }
00288
00289 void StreamerInputFile::logFileAction(char const* msg) {
00290 LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
00291 FlushMessageLog();
00292 }
00293 }