CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

edm::StreamerInputFile Class Reference

#include <StreamerInputFile.h>

List of all members.

Public Member Functions

void closeStreamerFile ()
 Needs to be public because of forking.
EventMsgView const * currentRecord () const
bool eofRecordMessage (uint32 const &hlt_path_cnt, EOFRecordView *&)
bool newHeader ()
bool next ()
InitMsgView const * startMessage () const
 StreamerInputFile (std::vector< std::string > const &names, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 StreamerInputFile (std::string const &name, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 ~StreamerInputFile ()

Private Member Functions

bool compareHeader ()
void logFileAction (char const *msg)
bool openNextFile ()
void openStreamerFile (std::string const &name)
IOSize readBytes (char *buf, IOSize nBytes)
int readEventMessage ()
void readStartMessage ()
IOOffset skipBytes (IOSize nBytes)

Private Attributes

boost::shared_ptr< EventMsgViewcurrentEvMsg_
unsigned int currentFile_
std::string currentFileName_
bool currentFileOpen_
uint32 currProto_
uint32 currRun_
bool endOfFile_
std::vector< char > eventBuf_
boost::shared_ptr
< EventSkipperByID
eventSkipperByID_
std::vector< char > headerBuf_
bool multiStreams_
bool newHeader_
boost::shared_ptr< InitMsgViewstartMsg_
boost::shared_ptr< Storagestorage_
std::vector< std::string > streamerNames_

Detailed Description

Definition at line 18 of file StreamerInputFile.h.


Constructor & Destructor Documentation

edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
) [explicit]

Reads a Streamer file

Definition at line 22 of file StreamerInputFile.cc.

References openStreamerFile(), and readStartMessage().

edm::StreamerInputFile::StreamerInputFile ( std::vector< std::string > const &  names,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
) [explicit]

Multiple Streamer files

Definition at line 43 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.

                                                                                           :
    startMsg_(),
    currentEvMsg_(),
    headerBuf_(1000*1000),
    eventBuf_(1000*1000*7),
    currentFile_(0),
    streamerNames_(names),
    multiStreams_(true),
    currentFileName_(),
    currentFileOpen_(false),
    eventSkipperByID_(eventSkipperByID),
    currRun_(0),
    currProto_(0),
    newHeader_(false),
    endOfFile_(false) {
    openStreamerFile(names.at(0));
    ++currentFile_;
    readStartMessage();
    currRun_ = startMsg_->run();
    currProto_ = startMsg_->protocolVersion();
  }
edm::StreamerInputFile::~StreamerInputFile ( )

Definition at line 18 of file StreamerInputFile.cc.

References closeStreamerFile().


Member Function Documentation

void edm::StreamerInputFile::closeStreamerFile ( )

Needs to be public because of forking.

Test bit if a new header is encountered

Definition at line 97 of file StreamerInputFile.cc.

References currentFileOpen_, logFileAction(), and storage_.

Referenced by openStreamerFile(), and ~StreamerInputFile().

                                       {
    if(currentFileOpen_ && storage_) {
      storage_->close();
      logFileAction("  Closed file ");
    }
    currentFileOpen_ = false;
  }
bool edm::StreamerInputFile::compareHeader ( ) [private]

Compares current File header with the newly opened file header Returns false in case of mismatch

Definition at line 206 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.

Referenced by openNextFile().

                                        {

    //Get the new header
    readStartMessage();

    //Values from new Header should match up
    if(currRun_ != startMsg_->run() ||
        currProto_ != startMsg_->protocolVersion()) {
      throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
        << "File " << streamerNames_.at(currentFile_)
        << "\nhas different run number or protocol version than previous\n";
      return false;
    }
    newHeader_ = true;
    return true;
  }
EventMsgView const* edm::StreamerInputFile::currentRecord ( ) const [inline]

Points to File Start Header/Message

Definition at line 36 of file StreamerInputFile.h.

References currentEvMsg_.

Referenced by WatcherStreamFileReader::getNextEvent().

{ return currentEvMsg_.get(); }
bool edm::StreamerInputFile::eofRecordMessage ( uint32 const &  hlt_path_cnt,
EOFRecordView *&  view 
)

Points to current Record

Definition at line 286 of file StreamerInputFile.cc.

References HeaderView::code(), endOfFile_, Header::EOFRECORD, eventBuf_, Exception, edm::errors::FileReadError, readBytes(), and HeaderView::size().

                                                                                           {
    if(!endOfFile_) return false;

    HeaderView head(&eventBuf_[0]);
    uint32 code = head.code();
    
    if(code != Header::EOFRECORD) {
      return false;
    }
    
    uint32 eofSize = head.size();
    IOSize nWant = eofSize - sizeof(EventHeader);
    if(nWant>0) {
      IOSize nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
      if(nGot != nWant) {
          throw Exception(errors::FileReadError, "StreamerInputFile::eofRecordMessage")
            << "Failed reading streamer file, second read in eofRecordMessage\n"
            << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
      }
    }

    view = new EOFRecordView(&eventBuf_[0], hlt_path_cnt);
    return true;
  }
void edm::StreamerInputFile::logFileAction ( char const *  msg) [private]

Definition at line 311 of file StreamerInputFile.cc.

References currentFileName_, and edm::FlushMessageLog().

Referenced by closeStreamerFile(), and openStreamerFile().

                                                       {
    LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
    FlushMessageLog();
  }
bool edm::StreamerInputFile::newHeader ( ) [inline]

Returns to file end-of-file record if the file has been complete read

Definition at line 42 of file StreamerInputFile.h.

References newHeader_, and tmp.

Referenced by WatcherStreamFileReader::newHeader().

{ bool tmp = newHeader_; newHeader_ = false; return tmp;}  
bool edm::StreamerInputFile::next ( void  )

Definition at line 168 of file StreamerInputFile.cc.

References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().

Referenced by WatcherStreamFileReader::getNextEvent().

                               {
    if(this->readEventMessage()) {
         return true;
    }
    if(multiStreams_) {
       //Try opening next file
       if(openNextFile()) {
          endOfFile_ = false;
          if(this->readEventMessage()) {
             return true;
          }
       }
    }
    return false;
  }
bool edm::StreamerInputFile::openNextFile ( ) [private]

Definition at line 184 of file StreamerInputFile.cc.

References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.

Referenced by next().

                                       {

     if(currentFile_ <= streamerNames_.size() - 1) {
       FDEBUG(10) << "Opening file "
                  << streamerNames_.at(currentFile_).c_str() << std::endl;

       openStreamerFile(streamerNames_.at(currentFile_));

       // If start message was already there, then compare the
       // previous and new headers
       if(startMsg_) {
          FDEBUG(10) << "Comparing Header" << std::endl;
          if(!compareHeader()) {
              return false;
          }
       }
       ++currentFile_;
       return true;
     }
     return false;
  }
void edm::StreamerInputFile::openStreamerFile ( std::string const &  name) [private]

Definition at line 67 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::check(), cms::Exception::clearMessage(), closeStreamerFile(), currentFileName_, currentFileOpen_, alignCSCRings::e, Exception, edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), mergeVDriftHistosByStation::name, IOFlags::OpenRead, findQualityFiles::size, and storage_.

Referenced by openNextFile(), and StreamerInputFile().

                                                           {

    closeStreamerFile();

    currentFileName_ = name;
    logFileAction("  Initiating request to open file ");

    IOOffset size = -1;
    if(StorageFactory::get()->check(name.c_str(), &size)) {
      try {
        storage_.reset(StorageFactory::get()->open(name.c_str(),
                                                   IOFlags::OpenRead));
      }
      catch(cms::Exception& e) {
        Exception ex(errors::FileOpenError, "", e);
        ex.addContext("Calling StreamerInputFile::openStreamerFile()");
        ex.clearMessage();
        ex <<  "Error Opening Streamer Input File: " << name << "\n";
        throw ex;
      }
    } else {
      throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
        << "Error Opening Streamer Input File, file does not exist: "
        << name << "\n";
    }
    currentFileOpen_ = true;
    logFileAction("  Successfully opened file ");
  }
unsigned int STLInputStream::readBytes ( char *  buf,
IOSize  nBytes 
) [private]

Definition at line 105 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::errors::FileReadError, n, and storage_.

Referenced by eofRecordMessage(), readEventMessage(), and readStartMessage().

                                                              {
    IOSize n = 0;
    try {
      n = storage_->read(buf, nBytes);
    }
    catch(cms::Exception& ce) {
      Exception ex(errors::FileReadError, "", ce);
      ex.addContext("Calling StreamerInputFile::readBytes()");
      throw ex;
    }
    return n;
  }
int edm::StreamerInputFile::readEventMessage ( ) [private]

Definition at line 224 of file StreamerInputFile.cc.

References HeaderView::code(), convert32(), currentEvMsg_, endOfFile_, Header::EOFRECORD, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, Exception, edm::errors::FileReadError, EventHeader::lumi_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().

Referenced by next().

                                          {
    if(endOfFile_) return 0;

    bool eventRead = false;
    while(!eventRead) {

      IOSize nWant = sizeof(EventHeader);
      IOSize nGot = readBytes(&eventBuf_[0], nWant);
      if(nGot != nWant) {
        throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
          << "Failed reading streamer file, first read in readEventMessage\n"
          << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
      }
      HeaderView head(&eventBuf_[0]);
      uint32 code = head.code();

      // When we get the EOF record we know we have read all events
      // normally and are at the end, return 0 to indicate this
      if(code == Header::EOFRECORD) {
        endOfFile_ = true;
        return 0;
      }
      // If it is not an event nor EOFRECORD then something is wrong.
      if(code != Header::EVENT) {
        throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
          << "Failed reading streamer file, unknown code in event header\n"
          << "code = " << code << "\n";
      }
      uint32 eventSize = head.size();
      if(eventSize <= sizeof(EventHeader)) {
        throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
          << "Failed reading streamer file, event header size from data too small\n";
      }
      eventRead = true;
      if(eventSkipperByID_) {
        EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
        if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
          eventRead = false;
        }
      }
      nWant = eventSize - sizeof(EventHeader);
      if(eventRead) {
        if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
        nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
        if(nGot != nWant) {
          throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
            << "Failed reading streamer file, second read in readEventMessage\n"
            << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
        }
      } else {
        nGot = skipBytes(nWant);
        if(nGot != nWant) {
          throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
            << "Failed reading streamer file, skip event in readEventMessage\n"
            << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
        }
      }
    }
    currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
    return 1;
  }
void edm::StreamerInputFile::readStartMessage ( ) [private]

Not an init message should return

Definition at line 133 of file StreamerInputFile.cc.

References HeaderView::code(), Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.

Referenced by compareHeader(), and StreamerInputFile().

                                           {
    IOSize nWant = sizeof(HeaderView);
    IOSize nGot = readBytes(&headerBuf_[0], nWant);
    if(nGot != nWant) {
      throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
        << "Failed reading streamer file, first read in readStartMessage\n";
    }

    HeaderView head(&headerBuf_[0]);
    uint32 code = head.code();
    if(code != Header::INIT) 
    {
      throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
        << "Expecting an init Message at start of file\n";
      return;
    }

    uint32 headerSize = head.size();
    if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);

    if(headerSize > sizeof(HeaderView)) {
      nWant = headerSize - sizeof(HeaderView);
      nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
      if(nGot != nWant) {
        throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
          << "Failed reading streamer file, second read in readStartMessage\n";
      }
    } else {
      throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
        << "Failed reading streamer file, init header size from data too small\n";
    }

    startMsg_.reset(new InitMsgView(&headerBuf_[0]));
  }
IOOffset edm::StreamerInputFile::skipBytes ( IOSize  nBytes) [private]

Definition at line 118 of file StreamerInputFile.cc.

References cms::Exception::addContext(), Storage::CURRENT, edm::errors::FileReadError, n, and storage_.

Referenced by readEventMessage().

                                                     {
    IOOffset n = 0;
    try {
      // We wish to return the number of bytes skipped, not the final offset.
      n = storage_->position(0, Storage::CURRENT);
      n = storage_->position(nBytes, Storage::CURRENT) - n;
    }
    catch(cms::Exception& ce) {
      Exception ex(errors::FileReadError, "", ce);
      ex.addContext("Calling StreamerInputFile::skipBytes()");
      throw ex;
    }
    return n;
  }
InitMsgView const* edm::StreamerInputFile::startMessage ( ) const [inline]

Moves the handler to next Event Record

Definition at line 33 of file StreamerInputFile.h.

References startMsg_.

Referenced by WatcherStreamFileReader::getHeader().

{ return startMsg_.get(); }

Member Data Documentation

Definition at line 64 of file StreamerInputFile.h.

Referenced by currentRecord(), and readEventMessage().

unsigned int edm::StreamerInputFile::currentFile_ [private]

Buffer to store Event Data

Definition at line 69 of file StreamerInputFile.h.

Referenced by compareHeader(), openNextFile(), and StreamerInputFile().

True if Multiple Streams are Read

Definition at line 72 of file StreamerInputFile.h.

Referenced by logFileAction(), and openStreamerFile().

Definition at line 73 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), and openStreamerFile().

Definition at line 78 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

Definition at line 77 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

Definition at line 84 of file StreamerInputFile.h.

Referenced by eofRecordMessage(), next(), and readEventMessage().

std::vector<char> edm::StreamerInputFile::eventBuf_ [private]

Buffer to store file Header

Definition at line 67 of file StreamerInputFile.h.

Referenced by eofRecordMessage(), and readEventMessage().

Definition at line 75 of file StreamerInputFile.h.

Referenced by readEventMessage().

std::vector<char> edm::StreamerInputFile::headerBuf_ [private]

Definition at line 66 of file StreamerInputFile.h.

Referenced by readStartMessage().

names of Streamer files

Definition at line 71 of file StreamerInputFile.h.

Referenced by next().

Definition at line 80 of file StreamerInputFile.h.

Referenced by compareHeader(), and newHeader().

boost::shared_ptr<InitMsgView> edm::StreamerInputFile::startMsg_ [private]
boost::shared_ptr<Storage> edm::StreamerInputFile::storage_ [private]

Definition at line 82 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), openStreamerFile(), readBytes(), and skipBytes().

std::vector<std::string> edm::StreamerInputFile::streamerNames_ [private]

keeps track of which file is in use at the moment

Definition at line 70 of file StreamerInputFile.h.

Referenced by compareHeader(), and openNextFile().