CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputFile Class Reference

#include <StreamerInputFile.h>

Public Member Functions

void closeStreamerFile ()
 Needs to be public because of forking. More...
 
EventMsgView const * currentRecord () const
 
bool eofRecordMessage (uint32 const &hlt_path_cnt, EOFRecordView *&)
 
bool newHeader ()
 
bool next ()
 
InitMsgView const * startMessage () const
 
 StreamerInputFile (std::string const &name, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 
 StreamerInputFile (std::vector< std::string > const &names, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 
 ~StreamerInputFile ()
 

Private Member Functions

bool compareHeader ()
 
void logFileAction (char const *msg)
 
bool openNextFile ()
 
void openStreamerFile (std::string const &name)
 
IOSize readBytes (char *buf, IOSize nBytes)
 
int readEventMessage ()
 
void readStartMessage ()
 
IOOffset skipBytes (IOSize nBytes)
 

Private Attributes

boost::shared_ptr< EventMsgViewcurrentEvMsg_
 
unsigned int currentFile_
 
std::string currentFileName_
 
bool currentFileOpen_
 
uint32 currProto_
 
uint32 currRun_
 
bool endOfFile_
 
std::vector< char > eventBuf_
 
boost::shared_ptr
< EventSkipperByID
eventSkipperByID_
 
std::vector< char > headerBuf_
 
bool multiStreams_
 
bool newHeader_
 
boost::shared_ptr< InitMsgViewstartMsg_
 
boost::shared_ptr< Storagestorage_
 
std::vector< std::string > streamerNames_
 

Detailed Description

Definition at line 18 of file StreamerInputFile.h.

Constructor & Destructor Documentation

edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
)
explicit

Reads a Streamer file

Definition at line 22 of file StreamerInputFile.cc.

References openStreamerFile(), and readStartMessage().

23  :
24  startMsg_(),
25  currentEvMsg_(),
26  headerBuf_(1000*1000),
27  eventBuf_(1000*1000*7),
28  currentFile_(0),
30  multiStreams_(false),
32  currentFileOpen_(false),
33  eventSkipperByID_(eventSkipperByID),
34  currRun_(0),
35  currProto_(0),
36  newHeader_(false),
37  storage_(),
38  endOfFile_(false) {
41  }
std::vector< char > eventBuf_
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
boost::shared_ptr< EventMsgView > currentEvMsg_
std::vector< std::string > streamerNames_
boost::shared_ptr< Storage > storage_
boost::shared_ptr< InitMsgView > startMsg_
std::vector< char > headerBuf_
void openStreamerFile(std::string const &name)
edm::StreamerInputFile::StreamerInputFile ( std::vector< std::string > const &  names,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
)
explicit

Multiple Streamer files

Definition at line 43 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.

44  :
45  startMsg_(),
46  currentEvMsg_(),
47  headerBuf_(1000*1000),
48  eventBuf_(1000*1000*7),
49  currentFile_(0),
51  multiStreams_(true),
53  currentFileOpen_(false),
54  eventSkipperByID_(eventSkipperByID),
55  currRun_(0),
56  currProto_(0),
57  newHeader_(false),
58  endOfFile_(false) {
59  openStreamerFile(names.at(0));
60  ++currentFile_;
62  currRun_ = startMsg_->run();
63  currProto_ = startMsg_->protocolVersion();
64  }
std::vector< char > eventBuf_
static const HistoName names[]
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
boost::shared_ptr< EventMsgView > currentEvMsg_
std::vector< std::string > streamerNames_
boost::shared_ptr< InitMsgView > startMsg_
std::vector< char > headerBuf_
void openStreamerFile(std::string const &name)
edm::StreamerInputFile::~StreamerInputFile ( )

Definition at line 18 of file StreamerInputFile.cc.

References closeStreamerFile().

18  {
20  }
void closeStreamerFile()
Needs to be public because of forking.

Member Function Documentation

void edm::StreamerInputFile::closeStreamerFile ( )

Needs to be public because of forking.

Test bit if a new header is encountered

Definition at line 97 of file StreamerInputFile.cc.

References currentFileOpen_, logFileAction(), and storage_.

Referenced by openStreamerFile(), and ~StreamerInputFile().

97  {
98  if(currentFileOpen_ && storage_) {
99  storage_->close();
100  logFileAction(" Closed file ");
101  }
102  currentFileOpen_ = false;
103  }
boost::shared_ptr< Storage > storage_
void logFileAction(char const *msg)
bool edm::StreamerInputFile::compareHeader ( )
private

Compares current File header with the newly opened file header Returns false in case of mismatch

Definition at line 206 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, edm::hlt::Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.

Referenced by openNextFile().

206  {
207 
208  //Get the new header
210 
211  //Values from new Header should match up
212  if(currRun_ != startMsg_->run() ||
213  currProto_ != startMsg_->protocolVersion()) {
214  throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
215  << "File " << streamerNames_.at(currentFile_)
216  << "\nhas different run number or protocol version than previous\n";
217  return false;
218  }
219  newHeader_ = true;
220  return true;
221  }
std::vector< std::string > streamerNames_
boost::shared_ptr< InitMsgView > startMsg_
EventMsgView const* edm::StreamerInputFile::currentRecord ( ) const
inline

Points to File Start Header/Message

Definition at line 36 of file StreamerInputFile.h.

References currentEvMsg_.

Referenced by WatcherStreamFileReader::getNextEvent().

36 { return currentEvMsg_.get(); }
boost::shared_ptr< EventMsgView > currentEvMsg_
bool edm::StreamerInputFile::eofRecordMessage ( uint32 const &  hlt_path_cnt,
EOFRecordView *&  view 
)

Points to current Record

Definition at line 286 of file StreamerInputFile.cc.

References HeaderView::code(), endOfFile_, Header::EOFRECORD, eventBuf_, edm::hlt::Exception, edm::errors::FileReadError, readBytes(), and HeaderView::size().

286  {
287  if(!endOfFile_) return false;
288 
289  HeaderView head(&eventBuf_[0]);
290  uint32 code = head.code();
291 
292  if(code != Header::EOFRECORD) {
293  return false;
294  }
295 
296  uint32 eofSize = head.size();
297  IOSize nWant = eofSize - sizeof(EventHeader);
298  if(nWant>0) {
299  IOSize nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
300  if(nGot != nWant) {
301  throw Exception(errors::FileReadError, "StreamerInputFile::eofRecordMessage")
302  << "Failed reading streamer file, second read in eofRecordMessage\n"
303  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
304  }
305  }
306 
307  view = new EOFRecordView(&eventBuf_[0], hlt_path_cnt);
308  return true;
309  }
std::vector< char > eventBuf_
unsigned int uint32
Definition: MsgTools.h:13
IOSize readBytes(char *buf, IOSize nBytes)
size_t IOSize
Definition: IOTypes.h:14
void edm::StreamerInputFile::logFileAction ( char const *  msg)
private

Definition at line 311 of file StreamerInputFile.cc.

References currentFileName_, and edm::FlushMessageLog().

Referenced by closeStreamerFile(), and openStreamerFile().

311  {
312  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
313  FlushMessageLog();
314  }
void FlushMessageLog()
bool edm::StreamerInputFile::newHeader ( )
inline

Returns to file end-of-file record if the file has been complete read

Definition at line 42 of file StreamerInputFile.h.

References newHeader_, and tmp.

Referenced by WatcherStreamFileReader::newHeader().

42 { bool tmp = newHeader_; newHeader_ = false; return tmp;}
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
bool edm::StreamerInputFile::next ( void  )

Definition at line 168 of file StreamerInputFile.cc.

References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().

Referenced by WatcherStreamFileReader::getNextEvent().

168  {
169  if(this->readEventMessage()) {
170  return true;
171  }
172  if(multiStreams_) {
173  //Try opening next file
174  if(openNextFile()) {
175  endOfFile_ = false;
176  if(this->readEventMessage()) {
177  return true;
178  }
179  }
180  }
181  return false;
182  }
bool edm::StreamerInputFile::openNextFile ( )
private

Definition at line 184 of file StreamerInputFile.cc.

References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.

Referenced by next().

184  {
185 
186  if(currentFile_ <= streamerNames_.size() - 1) {
187  FDEBUG(10) << "Opening file "
188  << streamerNames_.at(currentFile_).c_str() << std::endl;
189 
191 
192  // If start message was already there, then compare the
193  // previous and new headers
194  if(startMsg_) {
195  FDEBUG(10) << "Comparing Header" << std::endl;
196  if(!compareHeader()) {
197  return false;
198  }
199  }
200  ++currentFile_;
201  return true;
202  }
203  return false;
204  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< std::string > streamerNames_
boost::shared_ptr< InitMsgView > startMsg_
void openStreamerFile(std::string const &name)
void edm::StreamerInputFile::openStreamerFile ( std::string const &  name)
private

Definition at line 67 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::check(), cms::Exception::clearMessage(), closeStreamerFile(), currentFileName_, currentFileOpen_, alignCSCRings::e, edm::hlt::Exception, edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), mergeVDriftHistosByStation::name, IOFlags::OpenRead, findQualityFiles::size, and storage_.

Referenced by openNextFile(), and StreamerInputFile().

67  {
68 
70 
72  logFileAction(" Initiating request to open file ");
73 
74  IOOffset size = -1;
75  if(StorageFactory::get()->check(name.c_str(), &size)) {
76  try {
77  storage_.reset(StorageFactory::get()->open(name.c_str(),
79  }
80  catch(cms::Exception& e) {
82  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
83  ex.clearMessage();
84  ex << "Error Opening Streamer Input File: " << name << "\n";
85  throw ex;
86  }
87  } else {
88  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
89  << "Error Opening Streamer Input File, file does not exist: "
90  << name << "\n";
91  }
92  currentFileOpen_ = true;
93  logFileAction(" Successfully opened file ");
94  }
void closeStreamerFile()
Needs to be public because of forking.
static StorageFactory * get(void)
boost::shared_ptr< Storage > storage_
static void check(Principal const &p, std::string const &id)
void logFileAction(char const *msg)
int64_t IOOffset
Definition: IOTypes.h:19
tuple size
Write out results.
IOSize edm::StreamerInputFile::readBytes ( char *  buf,
IOSize  nBytes 
)
private

Definition at line 105 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::errors::FileReadError, n, and storage_.

Referenced by eofRecordMessage(), readEventMessage(), and readStartMessage().

105  {
106  IOSize n = 0;
107  try {
108  n = storage_->read(buf, nBytes);
109  }
110  catch(cms::Exception& ce) {
111  Exception ex(errors::FileReadError, "", ce);
112  ex.addContext("Calling StreamerInputFile::readBytes()");
113  throw ex;
114  }
115  return n;
116  }
boost::shared_ptr< Storage > storage_
size_t IOSize
Definition: IOTypes.h:14
int edm::StreamerInputFile::readEventMessage ( )
private

Definition at line 224 of file StreamerInputFile.cc.

References HeaderView::code(), convert32(), currentEvMsg_, endOfFile_, Header::EOFRECORD, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, edm::hlt::Exception, edm::errors::FileReadError, EventHeader::lumi_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().

Referenced by next().

224  {
225  if(endOfFile_) return 0;
226 
227  bool eventRead = false;
228  while(!eventRead) {
229 
230  IOSize nWant = sizeof(EventHeader);
231  IOSize nGot = readBytes(&eventBuf_[0], nWant);
232  if(nGot != nWant) {
233  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
234  << "Failed reading streamer file, first read in readEventMessage\n"
235  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
236  }
237  HeaderView head(&eventBuf_[0]);
238  uint32 code = head.code();
239 
240  // When we get the EOF record we know we have read all events
241  // normally and are at the end, return 0 to indicate this
242  if(code == Header::EOFRECORD) {
243  endOfFile_ = true;
244  return 0;
245  }
246  // If it is not an event nor EOFRECORD then something is wrong.
247  if(code != Header::EVENT) {
248  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
249  << "Failed reading streamer file, unknown code in event header\n"
250  << "code = " << code << "\n";
251  }
252  uint32 eventSize = head.size();
253  if(eventSize <= sizeof(EventHeader)) {
254  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
255  << "Failed reading streamer file, event header size from data too small\n";
256  }
257  eventRead = true;
258  if(eventSkipperByID_) {
259  EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
260  if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
261  eventRead = false;
262  }
263  }
264  nWant = eventSize - sizeof(EventHeader);
265  if(eventRead) {
266  if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
267  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
268  if(nGot != nWant) {
269  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
270  << "Failed reading streamer file, second read in readEventMessage\n"
271  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
272  }
273  } else {
274  nGot = skipBytes(nWant);
275  if(nGot != nWant) {
276  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
277  << "Failed reading streamer file, skip event in readEventMessage\n"
278  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
279  }
280  }
281  }
282  currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
283  return 1;
284  }
std::vector< char > eventBuf_
char_uint32 event_
Definition: EventMessage.h:62
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:63
boost::shared_ptr< EventMsgView > currentEvMsg_
IOOffset skipBytes(IOSize nBytes)
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:61
IOSize readBytes(char *buf, IOSize nBytes)
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:30
size_t IOSize
Definition: IOTypes.h:14
void edm::StreamerInputFile::readStartMessage ( )
private

Not an init message should return

Definition at line 133 of file StreamerInputFile.cc.

References HeaderView::code(), edm::hlt::Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.

Referenced by compareHeader(), and StreamerInputFile().

133  {
134  IOSize nWant = sizeof(HeaderView);
135  IOSize nGot = readBytes(&headerBuf_[0], nWant);
136  if(nGot != nWant) {
137  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
138  << "Failed reading streamer file, first read in readStartMessage\n";
139  }
140 
141  HeaderView head(&headerBuf_[0]);
142  uint32 code = head.code();
143  if(code != Header::INIT)
144  {
145  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
146  << "Expecting an init Message at start of file\n";
147  return;
148  }
149 
150  uint32 headerSize = head.size();
151  if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
152 
153  if(headerSize > sizeof(HeaderView)) {
154  nWant = headerSize - sizeof(HeaderView);
155  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
156  if(nGot != nWant) {
157  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
158  << "Failed reading streamer file, second read in readStartMessage\n";
159  }
160  } else {
161  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
162  << "Failed reading streamer file, init header size from data too small\n";
163  }
164 
165  startMsg_.reset(new InitMsgView(&headerBuf_[0]));
166  }
unsigned int uint32
Definition: MsgTools.h:13
boost::shared_ptr< InitMsgView > startMsg_
IOSize readBytes(char *buf, IOSize nBytes)
std::vector< char > headerBuf_
size_t IOSize
Definition: IOTypes.h:14
IOOffset edm::StreamerInputFile::skipBytes ( IOSize  nBytes)
private

Definition at line 118 of file StreamerInputFile.cc.

References cms::Exception::addContext(), Storage::CURRENT, edm::errors::FileReadError, n, and storage_.

Referenced by readEventMessage().

118  {
119  IOOffset n = 0;
120  try {
121  // We wish to return the number of bytes skipped, not the final offset.
122  n = storage_->position(0, Storage::CURRENT);
123  n = storage_->position(nBytes, Storage::CURRENT) - n;
124  }
125  catch(cms::Exception& ce) {
126  Exception ex(errors::FileReadError, "", ce);
127  ex.addContext("Calling StreamerInputFile::skipBytes()");
128  throw ex;
129  }
130  return n;
131  }
boost::shared_ptr< Storage > storage_
int64_t IOOffset
Definition: IOTypes.h:19
InitMsgView const* edm::StreamerInputFile::startMessage ( ) const
inline

Moves the handler to next Event Record

Definition at line 33 of file StreamerInputFile.h.

References startMsg_.

Referenced by WatcherStreamFileReader::getHeader().

33 { return startMsg_.get(); }
boost::shared_ptr< InitMsgView > startMsg_

Member Data Documentation

boost::shared_ptr<EventMsgView> edm::StreamerInputFile::currentEvMsg_
private

Definition at line 64 of file StreamerInputFile.h.

Referenced by currentRecord(), and readEventMessage().

unsigned int edm::StreamerInputFile::currentFile_
private

Buffer to store Event Data

Definition at line 69 of file StreamerInputFile.h.

Referenced by compareHeader(), openNextFile(), and StreamerInputFile().

std::string edm::StreamerInputFile::currentFileName_
private

True if Multiple Streams are Read

Definition at line 72 of file StreamerInputFile.h.

Referenced by logFileAction(), and openStreamerFile().

bool edm::StreamerInputFile::currentFileOpen_
private

Definition at line 73 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), and openStreamerFile().

uint32 edm::StreamerInputFile::currProto_
private

Definition at line 78 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

uint32 edm::StreamerInputFile::currRun_
private

Definition at line 77 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

bool edm::StreamerInputFile::endOfFile_
private

Definition at line 84 of file StreamerInputFile.h.

Referenced by eofRecordMessage(), next(), and readEventMessage().

std::vector<char> edm::StreamerInputFile::eventBuf_
private

Buffer to store file Header

Definition at line 67 of file StreamerInputFile.h.

Referenced by eofRecordMessage(), and readEventMessage().

boost::shared_ptr<EventSkipperByID> edm::StreamerInputFile::eventSkipperByID_
private

Definition at line 75 of file StreamerInputFile.h.

Referenced by readEventMessage().

std::vector<char> edm::StreamerInputFile::headerBuf_
private

Definition at line 66 of file StreamerInputFile.h.

Referenced by readStartMessage().

bool edm::StreamerInputFile::multiStreams_
private

names of Streamer files

Definition at line 71 of file StreamerInputFile.h.

Referenced by next().

bool edm::StreamerInputFile::newHeader_
private

Definition at line 80 of file StreamerInputFile.h.

Referenced by compareHeader(), and newHeader().

boost::shared_ptr<InitMsgView> edm::StreamerInputFile::startMsg_
private
boost::shared_ptr<Storage> edm::StreamerInputFile::storage_
private

Definition at line 82 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), openStreamerFile(), readBytes(), and skipBytes().

std::vector<std::string> edm::StreamerInputFile::streamerNames_
private

keeps track of which file is in use at the moment

Definition at line 70 of file StreamerInputFile.h.

Referenced by compareHeader(), and openNextFile().