#include <StreamerInputFile.h>
Public Member Functions | |
EventMsgView const * | currentRecord () const |
bool | newHeader () |
bool | next () |
InitMsgView const * | startMessage () const |
StreamerInputFile (std::vector< std::string > const &names, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >()) | |
StreamerInputFile (std::string const &name, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >()) | |
~StreamerInputFile () | |
Private Member Functions | |
bool | compareHeader () |
void | logFileAction (char const *msg) |
bool | openNextFile () |
void | openStreamerFile (std::string const &name) |
IOSize | readBytes (char *buf, IOSize nBytes) |
int | readEventMessage () |
void | readStartMessage () |
IOOffset | skipBytes (IOSize nBytes) |
Private Attributes | |
boost::shared_ptr< EventMsgView > | currentEvMsg_ |
unsigned int | currentFile_ |
std::string | currentFileName_ |
bool | currentFileOpen_ |
uint32 | currProto_ |
uint32 | currRun_ |
bool | endOfFile_ |
std::vector< char > | eventBuf_ |
boost::shared_ptr < EventSkipperByID > | eventSkipperByID_ |
std::vector< char > | headerBuf_ |
bool | multiStreams_ |
bool | newHeader_ |
int * | numberOfEventsToSkip_ |
boost::shared_ptr< InitMsgView > | startMsg_ |
boost::shared_ptr< Storage > | storage_ |
std::vector< std::string > | streamerNames_ |
Definition at line 17 of file StreamerInputFile.h.
edm::StreamerInputFile::StreamerInputFile | ( | std::string const & | name, |
int * | numberOfEventsToSkip = 0 , |
||
boost::shared_ptr< EventSkipperByID > | eventSkipperByID = boost::shared_ptr<EventSkipperByID>() |
||
) | [explicit] |
Reads a Streamer file
Definition at line 25 of file StreamerInputFile.cc.
References openStreamerFile(), and readStartMessage().
: startMsg_(), currentEvMsg_(), headerBuf_(1000*1000), eventBuf_(1000*1000*7), currentFile_(0), streamerNames_(), multiStreams_(false), currentFileName_(), currentFileOpen_(false), eventSkipperByID_(eventSkipperByID), numberOfEventsToSkip_(numberOfEventsToSkip), currRun_(0), currProto_(0), newHeader_(false), storage_(), endOfFile_(false) { openStreamerFile(name); readStartMessage(); }
edm::StreamerInputFile::StreamerInputFile | ( | std::vector< std::string > const & | names, |
int * | numberOfEventsToSkip = 0 , |
||
boost::shared_ptr< EventSkipperByID > | eventSkipperByID = boost::shared_ptr<EventSkipperByID>() |
||
) | [explicit] |
Multiple Streamer files
Definition at line 48 of file StreamerInputFile.cc.
References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.
: startMsg_(), currentEvMsg_(), headerBuf_(1000*1000), eventBuf_(1000*1000*7), currentFile_(0), streamerNames_(names), multiStreams_(true), currentFileName_(), currentFileOpen_(false), eventSkipperByID_(eventSkipperByID), numberOfEventsToSkip_(numberOfEventsToSkip), currRun_(0), currProto_(0), newHeader_(false), endOfFile_(false) { openStreamerFile(names.at(0)); ++currentFile_; readStartMessage(); currRun_ = startMsg_->run(); currProto_ = startMsg_->protocolVersion(); }
edm::StreamerInputFile::~StreamerInputFile | ( | ) |
Definition at line 18 of file StreamerInputFile.cc.
References currentFileOpen_, logFileAction(), and storage_.
{ if(storage_) { storage_->close(); if(currentFileOpen_) logFileAction(" Closed file "); } }
bool edm::StreamerInputFile::compareHeader | ( | ) | [private] |
Compares current File header with the newly opened file header Returns false in case of mismatch
Definition at line 208 of file StreamerInputFile.cc.
References currentFile_, currProto_, currRun_, Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.
Referenced by openNextFile().
{ //Get the new header readStartMessage(); //Values from new Header should match up if(currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) { throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader") << "File " << streamerNames_.at(currentFile_) << "\nhas different run number or protocol version than previous\n"; return false; } newHeader_ = true; return true; }
EventMsgView const* edm::StreamerInputFile::currentRecord | ( | ) | const [inline] |
Points to File Start Header/Message
Definition at line 37 of file StreamerInputFile.h.
References currentEvMsg_.
Referenced by WatcherStreamFileReader::getNextEvent().
{ return currentEvMsg_.get(); }
void edm::StreamerInputFile::logFileAction | ( | char const * | msg | ) | [private] |
Definition at line 292 of file StreamerInputFile.cc.
References currentFileName_, and edm::FlushMessageLog().
Referenced by openStreamerFile(), and ~StreamerInputFile().
{ LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_; FlushMessageLog(); }
bool edm::StreamerInputFile::newHeader | ( | ) | [inline] |
Points to current Record
Definition at line 40 of file StreamerInputFile.h.
References newHeader_, and tmp.
Referenced by WatcherStreamFileReader::newHeader().
{ bool tmp = newHeader_; newHeader_ = false; return tmp;}
bool edm::StreamerInputFile::next | ( | void | ) |
Definition at line 170 of file StreamerInputFile.cc.
References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().
Referenced by WatcherStreamFileReader::getNextEvent().
{ if(this->readEventMessage()) { return true; } if(multiStreams_) { //Try opening next file if(openNextFile()) { endOfFile_ = false; if(this->readEventMessage()) { return true; } } } return false; }
bool edm::StreamerInputFile::openNextFile | ( | ) | [private] |
Definition at line 186 of file StreamerInputFile.cc.
References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.
Referenced by next().
{ if(currentFile_ <= streamerNames_.size() - 1) { FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).c_str() << std::endl; openStreamerFile(streamerNames_.at(currentFile_)); // If start message was already there, then compare the // previous and new headers if(startMsg_) { FDEBUG(10) << "Comparing Header" << std::endl; if(!compareHeader()) { return false; } } ++currentFile_; return true; } return false; }
void edm::StreamerInputFile::openStreamerFile | ( | std::string const & | name | ) | [private] |
Test bit if a new header is encountered
Definition at line 74 of file StreamerInputFile.cc.
References cms::Exception::addContext(), edm::check(), cms::Exception::clearMessage(), currentFileName_, currentFileOpen_, alignCSCRings::e, Exception, edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), mergeVDriftHistosByStation::name, IOFlags::OpenRead, findQualityFiles::size, and storage_.
Referenced by openNextFile(), and StreamerInputFile().
{ if(storage_) { storage_->close(); if(currentFileOpen_) logFileAction(" Closed file "); } currentFileName_ = name; currentFileOpen_ = false; logFileAction(" Initiating request to open file "); IOOffset size = -1; if(StorageFactory::get()->check(name.c_str(), &size)) { try { storage_.reset(StorageFactory::get()->open(name.c_str(), IOFlags::OpenRead)); } catch(cms::Exception& e) { Exception ex(errors::FileOpenError, "", e); ex.addContext("Calling StreamerInputFile::openStreamerFile()"); ex.clearMessage(); ex << "Error Opening Streamer Input File: " << name << "\n"; throw ex; } } else { throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile") << "Error Opening Streamer Input File, file does not exist: " << name << "\n"; } currentFileOpen_ = true; logFileAction(" Successfully opened file "); }
unsigned int STLInputStream::readBytes | ( | char * | buf, |
IOSize | nBytes | ||
) | [private] |
Definition at line 107 of file StreamerInputFile.cc.
References cms::Exception::addContext(), edm::errors::FileReadError, n, and storage_.
Referenced by readEventMessage(), and readStartMessage().
{ IOSize n = 0; try { n = storage_->read(buf, nBytes); } catch(cms::Exception& ce) { Exception ex(errors::FileReadError, "", ce); ex.addContext("Calling StreamerInputFile::readBytes()"); throw ex; } return n; }
int edm::StreamerInputFile::readEventMessage | ( | ) | [private] |
Definition at line 226 of file StreamerInputFile.cc.
References HeaderView::code(), convert32(), currentEvMsg_, endOfFile_, Header::EOFRECORD, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, Exception, edm::errors::FileReadError, EventHeader::lumi_, numberOfEventsToSkip_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().
Referenced by next().
{ if(endOfFile_) return 0; bool eventRead = false; while(!eventRead) { IOSize nWant = sizeof(EventHeader); IOSize nGot = readBytes(&eventBuf_[0], nWant); if(nGot != nWant) { throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, first read in readEventMessage\n" << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n"; } HeaderView head(&eventBuf_[0]); uint32 code = head.code(); // When we get the EOF record we know we have read all events // normally and are at the end, return 0 to indicate this if(code == Header::EOFRECORD) { endOfFile_ = true; return 0; } // If it is not an event nor EOFRECORD then something is wrong. if(code != Header::EVENT) { throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, unknown code in event header\n" << "code = " << code << "\n"; } uint32 eventSize = head.size(); if(eventSize <= sizeof(EventHeader)) { throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, event header size from data too small\n"; } eventRead = true; if(eventSkipperByID_) { EventHeader *evh = (EventHeader *)(&eventBuf_[0]); if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) { eventRead = false; } } if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) { eventRead = false; --(*numberOfEventsToSkip_); } nWant = eventSize - sizeof(EventHeader); if(eventRead) { if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize); nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant); if(nGot != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, second read in readEventMessage\n" << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n"; } } else { nGot = skipBytes(nWant); if(nGot != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, skip event in readEventMessage\n" << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n"; } } } currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0])); return 1; }
void edm::StreamerInputFile::readStartMessage | ( | ) | [private] |
Not an init message should return
Definition at line 135 of file StreamerInputFile.cc.
References HeaderView::code(), Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.
Referenced by compareHeader(), and StreamerInputFile().
{ IOSize nWant = sizeof(HeaderView); IOSize nGot = readBytes(&headerBuf_[0], nWant); if(nGot != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") << "Failed reading streamer file, first read in readStartMessage\n"; } HeaderView head(&headerBuf_[0]); uint32 code = head.code(); if(code != Header::INIT) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") << "Expecting an init Message at start of file\n"; return; } uint32 headerSize = head.size(); if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize); if(headerSize > sizeof(HeaderView)) { nWant = headerSize - sizeof(HeaderView); nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant); if(nGot != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") << "Failed reading streamer file, second read in readStartMessage\n"; } } else { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") << "Failed reading streamer file, init header size from data too small\n"; } startMsg_.reset(new InitMsgView(&headerBuf_[0])); }
Definition at line 120 of file StreamerInputFile.cc.
References cms::Exception::addContext(), Storage::CURRENT, edm::errors::FileReadError, n, and storage_.
Referenced by readEventMessage().
{ IOOffset n = 0; try { // We wish to return the number of bytes skipped, not the final offset. n = storage_->position(0, Storage::CURRENT); n = storage_->position(nBytes, Storage::CURRENT) - n; } catch(cms::Exception& ce) { Exception ex(errors::FileReadError, "", ce); ex.addContext("Calling StreamerInputFile::skipBytes()"); throw ex; } return n; }
InitMsgView const* edm::StreamerInputFile::startMessage | ( | ) | const [inline] |
Moves the handler to next Event Record
Definition at line 34 of file StreamerInputFile.h.
References startMsg_.
Referenced by WatcherStreamFileReader::getHeader().
{ return startMsg_.get(); }
boost::shared_ptr<EventMsgView> edm::StreamerInputFile::currentEvMsg_ [private] |
Definition at line 60 of file StreamerInputFile.h.
Referenced by currentRecord(), and readEventMessage().
unsigned int edm::StreamerInputFile::currentFile_ [private] |
Definition at line 65 of file StreamerInputFile.h.
Referenced by compareHeader(), openNextFile(), and StreamerInputFile().
std::string edm::StreamerInputFile::currentFileName_ [private] |
True if Multiple Streams are Read
Definition at line 68 of file StreamerInputFile.h.
Referenced by logFileAction(), and openStreamerFile().
bool edm::StreamerInputFile::currentFileOpen_ [private] |
Definition at line 69 of file StreamerInputFile.h.
Referenced by openStreamerFile(), and ~StreamerInputFile().
uint32 edm::StreamerInputFile::currProto_ [private] |
Definition at line 76 of file StreamerInputFile.h.
Referenced by compareHeader(), and StreamerInputFile().
uint32 edm::StreamerInputFile::currRun_ [private] |
Definition at line 75 of file StreamerInputFile.h.
Referenced by compareHeader(), and StreamerInputFile().
bool edm::StreamerInputFile::endOfFile_ [private] |
Definition at line 82 of file StreamerInputFile.h.
Referenced by next(), and readEventMessage().
std::vector<char> edm::StreamerInputFile::eventBuf_ [private] |
Buffer to store file Header
Definition at line 63 of file StreamerInputFile.h.
Referenced by readEventMessage().
boost::shared_ptr<EventSkipperByID> edm::StreamerInputFile::eventSkipperByID_ [private] |
Definition at line 71 of file StreamerInputFile.h.
Referenced by readEventMessage().
std::vector<char> edm::StreamerInputFile::headerBuf_ [private] |
Definition at line 62 of file StreamerInputFile.h.
Referenced by readStartMessage().
bool edm::StreamerInputFile::multiStreams_ [private] |
bool edm::StreamerInputFile::newHeader_ [private] |
Definition at line 78 of file StreamerInputFile.h.
Referenced by compareHeader(), and newHeader().
int* edm::StreamerInputFile::numberOfEventsToSkip_ [private] |
Definition at line 73 of file StreamerInputFile.h.
Referenced by readEventMessage().
boost::shared_ptr<InitMsgView> edm::StreamerInputFile::startMsg_ [private] |
Definition at line 59 of file StreamerInputFile.h.
Referenced by compareHeader(), openNextFile(), readStartMessage(), startMessage(), and StreamerInputFile().
boost::shared_ptr<Storage> edm::StreamerInputFile::storage_ [private] |
Definition at line 80 of file StreamerInputFile.h.
Referenced by openStreamerFile(), readBytes(), skipBytes(), and ~StreamerInputFile().
std::vector<std::string> edm::StreamerInputFile::streamerNames_ [private] |
keeps track of which file is in use at the moment
Definition at line 66 of file StreamerInputFile.h.
Referenced by compareHeader(), and openNextFile().