CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

edm::StreamerInputFile Class Reference

#include <StreamerInputFile.h>

List of all members.

Public Member Functions

EventMsgView const * currentRecord () const
bool const newHeader ()
bool next ()
InitMsgView const * startMessage () const
 StreamerInputFile (std::vector< std::string > const &names, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 StreamerInputFile (std::string const &name, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 ~StreamerInputFile ()

Private Member Functions

bool compareHeader ()
void logFileAction (char const *msg)
bool openNextFile ()
void openStreamerFile (std::string const &name)
IOSize readBytes (char *buf, IOSize nBytes)
int readEventMessage ()
void readStartMessage ()
IOOffset skipBytes (IOSize nBytes)

Private Attributes

boost::shared_ptr< EventMsgViewcurrentEvMsg_
unsigned int currentFile_
std::string currentFileName_
bool currentFileOpen_
uint32 currProto_
uint32 currRun_
bool endOfFile_
std::vector< char > eventBuf_
boost::shared_ptr
< EventSkipperByID
eventSkipperByID_
std::vector< char > headerBuf_
bool multiStreams_
bool newHeader_
int * numberOfEventsToSkip_
boost::shared_ptr< InitMsgViewstartMsg_
boost::shared_ptr< Storagestorage_
std::vector< std::string > streamerNames_

Detailed Description

Definition at line 17 of file StreamerInputFile.h.


Constructor & Destructor Documentation

edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
int *  numberOfEventsToSkip = 0,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
) [explicit]

Reads a Streamer file

Definition at line 25 of file StreamerInputFile.cc.

References openStreamerFile(), and readStartMessage().

                                                                                           :
    startMsg_(),
    currentEvMsg_(),
    headerBuf_(1000*1000),
    eventBuf_(1000*1000*7),
    multiStreams_(false),
    currentFileName_(),
    currentFileOpen_(false),
    eventSkipperByID_(eventSkipperByID),
    numberOfEventsToSkip_(numberOfEventsToSkip),
    newHeader_(false),
    endOfFile_(false) {
    openStreamerFile(name);
    readStartMessage();
  }
edm::StreamerInputFile::StreamerInputFile ( std::vector< std::string > const &  names,
int *  numberOfEventsToSkip = 0,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
) [explicit]

Multiple Streamer files

Definition at line 44 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.

                                                                                           :
    startMsg_(),
    currentEvMsg_(),
    headerBuf_(1000*1000),
    eventBuf_(1000*1000*7),
    currentFile_(0),
    streamerNames_(names),
    multiStreams_(true),
    currentFileName_(),
    currentFileOpen_(false),
    eventSkipperByID_(eventSkipperByID),
    numberOfEventsToSkip_(numberOfEventsToSkip),
    currRun_(0),
    currProto_(0),
    newHeader_(false),
    endOfFile_(false) {
    openStreamerFile(names.at(0));
    ++currentFile_;
    readStartMessage();
    currRun_ = startMsg_->run();
    currProto_ = startMsg_->protocolVersion();
  }
edm::StreamerInputFile::~StreamerInputFile ( )

Definition at line 18 of file StreamerInputFile.cc.

References currentFileOpen_, logFileAction(), and storage_.

                                        {
    if(storage_) {
      storage_->close();
      if(currentFileOpen_) logFileAction("  Closed file ");
    }
  }

Member Function Documentation

bool edm::StreamerInputFile::compareHeader ( ) [private]

Compares current File header with the newly opened file header Returns false in case of mismatch

Definition at line 204 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.

Referenced by openNextFile().

                                        {

    //Get the new header
    readStartMessage();

    //Values from new Header should match up
    if(currRun_ != startMsg_->run() ||
        currProto_ != startMsg_->protocolVersion()) {
      throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
        << "File " << streamerNames_.at(currentFile_)
        << "\nhas different run number or protocol version than previous\n";
      return false;
    }
    newHeader_ = true;
    return true;
  }
EventMsgView const* edm::StreamerInputFile::currentRecord ( ) const [inline]

Points to File Start Header/Message

Definition at line 37 of file StreamerInputFile.h.

References currentEvMsg_.

Referenced by WatcherStreamFileReader::getNextEvent().

{ return currentEvMsg_.get(); }
void edm::StreamerInputFile::logFileAction ( char const *  msg) [private]

Definition at line 288 of file StreamerInputFile.cc.

References currentFileName_, and edm::FlushMessageLog().

Referenced by openStreamerFile(), and ~StreamerInputFile().

                                                       {
    LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
    FlushMessageLog();
  }
bool const edm::StreamerInputFile::newHeader ( ) [inline]

Points to current Record

Definition at line 40 of file StreamerInputFile.h.

References newHeader_, and tmp.

Referenced by WatcherStreamFileReader::newHeader().

{ bool tmp = newHeader_; newHeader_ = false; return tmp;}  
bool edm::StreamerInputFile::next ( void  )

Definition at line 166 of file StreamerInputFile.cc.

References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().

Referenced by WatcherStreamFileReader::getNextEvent().

                               {
    if(this->readEventMessage()) {
         return true;
    }
    if(multiStreams_) {
       //Try opening next file
       if(openNextFile()) {
          endOfFile_ = false;
          if(this->readEventMessage()) {
             return true;
          }
       }
    }
    return false;
  }
bool edm::StreamerInputFile::openNextFile ( ) [private]

Definition at line 182 of file StreamerInputFile.cc.

References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.

Referenced by next().

                                       {

     if(currentFile_ <= streamerNames_.size() - 1) {
       FDEBUG(10) << "Opening file "
                  << streamerNames_.at(currentFile_).c_str() << std::endl;

       openStreamerFile(streamerNames_.at(currentFile_));

       // If start message was already there, then compare the
       // previous and new headers
       if(startMsg_) {
          FDEBUG(10) << "Comparing Header" << std::endl;
          if(!compareHeader()) {
              return false;
          }
       }
       ++currentFile_;
       return true;
     }
     return false;
  }
void edm::StreamerInputFile::openStreamerFile ( std::string const &  name) [private]

Test bit if a new header is encountered

Definition at line 70 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::check(), cms::Exception::clearMessage(), currentFileName_, currentFileOpen_, Exception, edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), mergeVDriftHistosByStation::name, IOFlags::OpenRead, findQualityFiles::size, and storage_.

Referenced by openNextFile(), and StreamerInputFile().

                                                           {

    if(storage_) {
      storage_->close();
      if(currentFileOpen_) logFileAction("  Closed file ");
    }

    currentFileName_ = name;
    currentFileOpen_ = false;
    logFileAction("  Initiating request to open file ");

    IOOffset size = -1;
    if(StorageFactory::get()->check(name.c_str(), &size)) {
      try {
        storage_.reset(StorageFactory::get()->open(name.c_str(),
                                                   IOFlags::OpenRead));
      }
      catch(cms::Exception& e) {
        Exception ex(errors::FileOpenError, "", e);
        ex.addContext("Calling StreamerInputFile::openStreamerFile()");
        ex.clearMessage();
        ex <<  "Error Opening Streamer Input File: " << name << "\n";
        throw ex;
      }
    } else {
      throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
        << "Error Opening Streamer Input File, file does not exist: "
        << name << "\n";
    }
    currentFileOpen_ = true;
    logFileAction("  Successfully opened file ");
  }
unsigned int STLInputStream::readBytes ( char *  buf,
IOSize  nBytes 
) [private]

Definition at line 103 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::errors::FileReadError, n, and storage_.

Referenced by readEventMessage(), and readStartMessage().

                                                              {
    IOSize n = 0;
    try {
      n = storage_->read(buf, nBytes);
    }
    catch(cms::Exception& ce) {
      Exception ex(errors::FileReadError, "", ce);
      ex.addContext("Calling StreamerInputFile::readBytes()");
      throw ex;
    }
    return n;
  }
int edm::StreamerInputFile::readEventMessage ( ) [private]

Definition at line 222 of file StreamerInputFile.cc.

References HeaderView::code(), convert32(), currentEvMsg_, endOfFile_, Header::EOFRECORD, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, Exception, edm::errors::FileReadError, EventHeader::lumi_, numberOfEventsToSkip_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().

Referenced by next().

                                          {
    if(endOfFile_) return 0;

    bool eventRead = false;
    while(!eventRead) {

      IOSize nWant = sizeof(EventHeader);
      IOSize nGot = readBytes(&eventBuf_[0], nWant);
      if(nGot != nWant) {
        throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
          << "Failed reading streamer file, first read in readEventMessage\n"
          << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
      }
      HeaderView head(&eventBuf_[0]);
      uint32 code = head.code();

      // When we get the EOF record we know we have read all events
      // normally and are at the end, return 0 to indicate this
      if(code == Header::EOFRECORD) {
        endOfFile_ = true;
        return 0;
      }
      // If it is not an event nor EOFRECORD then something is wrong.
      if(code != Header::EVENT) {
        throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
          << "Failed reading streamer file, unknown code in event header\n"
          << "code = " << code << "\n";
      }
      uint32 eventSize = head.size();
      if(eventSize <= sizeof(EventHeader)) {
        throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
          << "Failed reading streamer file, event header size from data too small\n";
      }
      eventRead = true;
      if(eventSkipperByID_) {
        EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
        if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
          eventRead = false;
        }
      }
      if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
        eventRead = false;
        --(*numberOfEventsToSkip_);
      }
      nWant = eventSize - sizeof(EventHeader);
      if(eventRead) {
        if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
        nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
        if(nGot != nWant) {
          throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
            << "Failed reading streamer file, second read in readEventMessage\n"
            << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
        }
      } else {
        nGot = skipBytes(nWant);
        if(nGot != nWant) {
          throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
            << "Failed reading streamer file, skip event in readEventMessage\n"
            << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
        }
      }
    }
    currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
    return 1;
  }
void edm::StreamerInputFile::readStartMessage ( ) [private]

Not an init message should return

Definition at line 131 of file StreamerInputFile.cc.

References HeaderView::code(), Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.

Referenced by compareHeader(), and StreamerInputFile().

                                           {
    IOSize nWant = sizeof(HeaderView);
    IOSize nGot = readBytes(&headerBuf_[0], nWant);
    if(nGot != nWant) {
      throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
        << "Failed reading streamer file, first read in readStartMessage\n";
    }

    HeaderView head(&headerBuf_[0]);
    uint32 code = head.code();
    if(code != Header::INIT) 
    {
      throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
        << "Expecting an init Message at start of file\n";
      return;
    }

    uint32 headerSize = head.size();
    if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);

    if(headerSize > sizeof(HeaderView)) {
      nWant = headerSize - sizeof(HeaderView);
      nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
      if(nGot != nWant) {
        throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
          << "Failed reading streamer file, second read in readStartMessage\n";
      }
    } else {
      throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
        << "Failed reading streamer file, init header size from data too small\n";
    }

    startMsg_.reset(new InitMsgView(&headerBuf_[0]));
  }
IOOffset edm::StreamerInputFile::skipBytes ( IOSize  nBytes) [private]

Definition at line 116 of file StreamerInputFile.cc.

References cms::Exception::addContext(), Storage::CURRENT, edm::errors::FileReadError, n, and storage_.

Referenced by readEventMessage().

                                                     {
    IOOffset n = 0;
    try {
      // We wish to return the number of bytes skipped, not the final offset.
      n = storage_->position(0, Storage::CURRENT);
      n = storage_->position(nBytes, Storage::CURRENT) - n;
    }
    catch(cms::Exception& ce) {
      Exception ex(errors::FileReadError, "", ce);
      ex.addContext("Calling StreamerInputFile::skipBytes()");
      throw ex;
    }
    return n;
  }
InitMsgView const* edm::StreamerInputFile::startMessage ( ) const [inline]

Moves the handler to next Event Record

Definition at line 34 of file StreamerInputFile.h.

References startMsg_.

Referenced by WatcherStreamFileReader::getHeader().

{ return startMsg_.get(); }

Member Data Documentation

Definition at line 60 of file StreamerInputFile.h.

Referenced by currentRecord(), and readEventMessage().

unsigned int edm::StreamerInputFile::currentFile_ [private]

Buffer to store Event Data

Definition at line 65 of file StreamerInputFile.h.

Referenced by compareHeader(), openNextFile(), and StreamerInputFile().

True if Multiple Streams are Read

Definition at line 68 of file StreamerInputFile.h.

Referenced by logFileAction(), and openStreamerFile().

Definition at line 69 of file StreamerInputFile.h.

Referenced by openStreamerFile(), and ~StreamerInputFile().

Definition at line 76 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

Definition at line 75 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

Definition at line 82 of file StreamerInputFile.h.

Referenced by next(), and readEventMessage().

std::vector<char> edm::StreamerInputFile::eventBuf_ [private]

Buffer to store file Header

Definition at line 63 of file StreamerInputFile.h.

Referenced by readEventMessage().

Definition at line 71 of file StreamerInputFile.h.

Referenced by readEventMessage().

std::vector<char> edm::StreamerInputFile::headerBuf_ [private]

Definition at line 62 of file StreamerInputFile.h.

Referenced by readStartMessage().

names of Streamer files

Definition at line 67 of file StreamerInputFile.h.

Referenced by next().

Definition at line 78 of file StreamerInputFile.h.

Referenced by compareHeader(), and newHeader().

Definition at line 73 of file StreamerInputFile.h.

Referenced by readEventMessage().

boost::shared_ptr<InitMsgView> edm::StreamerInputFile::startMsg_ [private]
boost::shared_ptr<Storage> edm::StreamerInputFile::storage_ [private]

Definition at line 80 of file StreamerInputFile.h.

Referenced by openStreamerFile(), readBytes(), skipBytes(), and ~StreamerInputFile().

std::vector<std::string> edm::StreamerInputFile::streamerNames_ [private]

keeps track of which file is in use at the moment

Definition at line 66 of file StreamerInputFile.h.

Referenced by compareHeader(), and openNextFile().