CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputFile Class Reference

#include <StreamerInputFile.h>

Public Member Functions

EventMsgView const * currentRecord () const
 
bool const newHeader ()
 
bool next ()
 
InitMsgView const * startMessage () const
 
 StreamerInputFile (std::string const &name, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 
 StreamerInputFile (std::vector< std::string > const &names, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())
 
 ~StreamerInputFile ()
 

Private Member Functions

bool compareHeader ()
 
void logFileAction (char const *msg)
 
bool openNextFile ()
 
void openStreamerFile (std::string const &name)
 
IOSize readBytes (char *buf, IOSize nBytes)
 
int readEventMessage ()
 
void readStartMessage ()
 
IOOffset skipBytes (IOSize nBytes)
 

Private Attributes

boost::shared_ptr< EventMsgViewcurrentEvMsg_
 
unsigned int currentFile_
 
std::string currentFileName_
 
bool currentFileOpen_
 
uint32 currProto_
 
uint32 currRun_
 
bool endOfFile_
 
std::vector< char > eventBuf_
 
boost::shared_ptr
< EventSkipperByID
eventSkipperByID_
 
std::vector< char > headerBuf_
 
bool multiStreams_
 
bool newHeader_
 
int * numberOfEventsToSkip_
 
boost::shared_ptr< InitMsgViewstartMsg_
 
boost::shared_ptr< Storagestorage_
 
std::vector< std::string > streamerNames_
 

Detailed Description

Definition at line 17 of file StreamerInputFile.h.

Constructor & Destructor Documentation

edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
int *  numberOfEventsToSkip = 0,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
)
explicit

Reads a Streamer file

Definition at line 25 of file StreamerInputFile.cc.

References openStreamerFile(), and readStartMessage().

27  :
28  startMsg_(),
29  currentEvMsg_(),
30  headerBuf_(1000*1000),
31  eventBuf_(1000*1000*7),
32  multiStreams_(false),
34  currentFileOpen_(false),
35  eventSkipperByID_(eventSkipperByID),
36  numberOfEventsToSkip_(numberOfEventsToSkip),
37  newHeader_(false),
38  endOfFile_(false) {
41  }
std::vector< char > eventBuf_
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
boost::shared_ptr< EventMsgView > currentEvMsg_
boost::shared_ptr< InitMsgView > startMsg_
std::vector< char > headerBuf_
void openStreamerFile(std::string const &name)
edm::StreamerInputFile::StreamerInputFile ( std::vector< std::string > const &  names,
int *  numberOfEventsToSkip = 0,
boost::shared_ptr< EventSkipperByID eventSkipperByID = boost::shared_ptr<EventSkipperByID>() 
)
explicit

Multiple Streamer files

Definition at line 44 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.

46  :
47  startMsg_(),
48  currentEvMsg_(),
49  headerBuf_(1000*1000),
50  eventBuf_(1000*1000*7),
51  currentFile_(0),
53  multiStreams_(true),
55  currentFileOpen_(false),
56  eventSkipperByID_(eventSkipperByID),
57  numberOfEventsToSkip_(numberOfEventsToSkip),
58  currRun_(0),
59  currProto_(0),
60  newHeader_(false),
61  endOfFile_(false) {
62  openStreamerFile(names.at(0));
63  ++currentFile_;
65  currRun_ = startMsg_->run();
66  currProto_ = startMsg_->protocolVersion();
67  }
std::vector< char > eventBuf_
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
boost::shared_ptr< EventMsgView > currentEvMsg_
std::vector< std::string > streamerNames_
boost::shared_ptr< InitMsgView > startMsg_
std::vector< char > headerBuf_
void openStreamerFile(std::string const &name)
static const HistoName names[]
edm::StreamerInputFile::~StreamerInputFile ( )

Definition at line 18 of file StreamerInputFile.cc.

References currentFileOpen_, logFileAction(), and storage_.

18  {
19  if (storage_) {
20  storage_->close();
21  if (currentFileOpen_) logFileAction(" Closed file ");
22  }
23  }
boost::shared_ptr< Storage > storage_
void logFileAction(char const *msg)

Member Function Documentation

bool edm::StreamerInputFile::compareHeader ( )
private

Compares current File header with the newly opened file header Returns false in case of mismatch

Definition at line 205 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, edm::hlt::Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.

Referenced by openNextFile().

205  {
206 
207  //Get the new header
209 
210  //Values from new Header should match up
211  if (currRun_ != startMsg_->run() ||
212  currProto_ != startMsg_->protocolVersion()) {
213  throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
214  << "File " << streamerNames_.at(currentFile_)
215  << "\nhas different run number or protocol version than previous\n";
216  return false;
217  }
218  newHeader_ = true;
219  return true;
220  }
std::vector< std::string > streamerNames_
boost::shared_ptr< InitMsgView > startMsg_
EventMsgView const* edm::StreamerInputFile::currentRecord ( ) const
inline

Points to File Start Header/Message

Definition at line 37 of file StreamerInputFile.h.

References currentEvMsg_.

Referenced by WatcherStreamFileReader::getNextEvent().

37 { return currentEvMsg_.get(); }
boost::shared_ptr< EventMsgView > currentEvMsg_
void edm::StreamerInputFile::logFileAction ( char const *  msg)
private

Definition at line 289 of file StreamerInputFile.cc.

References currentFileName_, and edm::FlushMessageLog().

Referenced by openStreamerFile(), and ~StreamerInputFile().

289  {
290  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
291  FlushMessageLog();
292  }
void FlushMessageLog()
bool const edm::StreamerInputFile::newHeader ( )
inline

Points to current Record

Definition at line 40 of file StreamerInputFile.h.

References newHeader_, and tmp.

Referenced by WatcherStreamFileReader::newHeader().

40 { bool tmp = newHeader_; newHeader_ = false; return tmp;}
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
bool edm::StreamerInputFile::next ( void  )

Definition at line 167 of file StreamerInputFile.cc.

References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().

Referenced by WatcherStreamFileReader::getNextEvent().

167  {
168  if (this->readEventMessage()) {
169  return true;
170  }
171  if (multiStreams_) {
172  //Try opening next file
173  if (openNextFile()) {
174  endOfFile_ = false;
175  if (this->readEventMessage()) {
176  return true;
177  }
178  }
179  }
180  return false;
181  }
bool edm::StreamerInputFile::openNextFile ( )
private

Definition at line 183 of file StreamerInputFile.cc.

References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.

Referenced by next().

183  {
184 
185  if (currentFile_ <= streamerNames_.size() - 1) {
186  FDEBUG(10) << "Opening file "
187  << streamerNames_.at(currentFile_).c_str() << std::endl;
188 
190 
191  // If start message was already there, then compare the
192  // previous and new headers
193  if (startMsg_) {
194  FDEBUG(10) << "Comparing Header" << std::endl;
195  if (!compareHeader()) {
196  return false;
197  }
198  }
199  ++currentFile_;
200  return true;
201  }
202  return false;
203  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< std::string > streamerNames_
boost::shared_ptr< InitMsgView > startMsg_
void openStreamerFile(std::string const &name)
void edm::StreamerInputFile::openStreamerFile ( std::string const &  name)
private

Test bit if a new header is encountered

Definition at line 70 of file StreamerInputFile.cc.

References edm::check(), currentFileName_, currentFileOpen_, edm::hlt::Exception, cms::Exception::explainSelf(), edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), mergeVDriftHistosByStation::name, IOFlags::OpenRead, findQualityFiles::size, and storage_.

Referenced by openNextFile(), and StreamerInputFile().

70  {
71 
72  if (storage_) {
73  storage_->close();
74  if (currentFileOpen_) logFileAction(" Closed file ");
75  }
76 
78  currentFileOpen_ = false;
79  logFileAction(" Initiating request to open file ");
80 
81  IOOffset size = -1;
82  if (StorageFactory::get()->check(name.c_str(), &size)) {
83  try {
84  storage_.reset(StorageFactory::get()->open(name.c_str(),
86  }
87  catch (cms::Exception& e) {
88  throw Exception(errors::FileOpenError,"StreamerInputFile::openStreamerFile")
89  << "Error Opening Streamer Input File: " << name << "\n"
90  << e.explainSelf() << "\n";
91  }
92  }
93  else {
94  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
95  << "Error Opening Streamer Input File, file does not exist: "
96  << name << "\n";
97  }
98  currentFileOpen_ = true;
99  logFileAction(" Successfully opened file ");
100  }
virtual std::string explainSelf() const
Definition: Exception.cc:56
static StorageFactory * get(void)
boost::shared_ptr< Storage > storage_
static void check(Principal const &p, std::string const &id)
void logFileAction(char const *msg)
int64_t IOOffset
Definition: IOTypes.h:19
tuple size
Write out results.
IOSize edm::StreamerInputFile::readBytes ( char *  buf,
IOSize  nBytes 
)
private

Definition at line 102 of file StreamerInputFile.cc.

References edm::hlt::Exception, cms::Exception::explainSelf(), edm::errors::FileReadError, n, and storage_.

Referenced by readEventMessage(), and readStartMessage().

102  {
103  IOSize n = 0;
104  try {
105  n = storage_->read(buf, nBytes);
106  }
107  catch (cms::Exception& ce) {
108  throw Exception(errors::FileReadError, "StreamerInputFile::readBytes")
109  << "Failed reading streamer file in function readBytes\n"
110  << ce.explainSelf() << "\n";
111  }
112  return n;
113  }
virtual std::string explainSelf() const
Definition: Exception.cc:56
boost::shared_ptr< Storage > storage_
size_t IOSize
Definition: IOTypes.h:14
int edm::StreamerInputFile::readEventMessage ( )
private

Definition at line 223 of file StreamerInputFile.cc.

References HeaderView::code(), convert32(), currentEvMsg_, endOfFile_, Header::EOFRECORD, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, edm::hlt::Exception, edm::errors::FileReadError, EventHeader::lumi_, numberOfEventsToSkip_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().

Referenced by next().

223  {
224  if (endOfFile_) return 0;
225 
226  bool eventRead = false;
227  while (!eventRead) {
228 
229  IOSize nWant = sizeof(EventHeader);
230  IOSize nGot = readBytes(&eventBuf_[0], nWant);
231  if (nGot != nWant) {
232  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
233  << "Failed reading streamer file, first read in readEventMessage\n"
234  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
235  }
236  HeaderView head(&eventBuf_[0]);
237  uint32 code = head.code();
238 
239  // When we get the EOF record we know we have read all events
240  // normally and are at the end, return 0 to indicate this
241  if (code == Header::EOFRECORD) {
242  endOfFile_ = true;
243  return 0;
244  }
245  // If it is not an event nor EOFRECORD then something is wrong.
246  if (code != Header::EVENT) {
247  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
248  << "Failed reading streamer file, unknown code in event header\n"
249  << "code = " << code << "\n";
250  }
251  uint32 eventSize = head.size();
252  if (eventSize <= sizeof(EventHeader)) {
253  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
254  << "Failed reading streamer file, event header size from data too small\n";
255  }
256  eventRead = true;
257  if (eventSkipperByID_) {
258  EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
259  if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
260  eventRead = false;
261  }
262  }
263  if (eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
264  eventRead = false;
265  --(*numberOfEventsToSkip_);
266  }
267  nWant = eventSize - sizeof(EventHeader);
268  if (eventRead) {
269  if (eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
270  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
271  if (nGot != nWant) {
272  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
273  << "Failed reading streamer file, second read in readEventMessage\n"
274  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
275  }
276  } else {
277  nGot = skipBytes(nWant);
278  if (nGot != nWant) {
279  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
280  << "Failed reading streamer file, skip event in readEventMessage\n"
281  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
282  }
283  }
284  }
285  currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
286  return 1;
287  }
std::vector< char > eventBuf_
char_uint32 event_
Definition: EventMessage.h:62
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:63
boost::shared_ptr< EventMsgView > currentEvMsg_
IOOffset skipBytes(IOSize nBytes)
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:61
IOSize readBytes(char *buf, IOSize nBytes)
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:30
size_t IOSize
Definition: IOTypes.h:14
void edm::StreamerInputFile::readStartMessage ( )
private

Not an init message should return

Definition at line 131 of file StreamerInputFile.cc.

References HeaderView::code(), edm::hlt::Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.

Referenced by compareHeader(), and StreamerInputFile().

131  {
132  IOSize nWant = sizeof(HeaderView);
133  IOSize nGot = readBytes(&headerBuf_[0], nWant);
134  if (nGot != nWant) {
135  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
136  << "Failed reading streamer file, first read in readStartMessage\n";
137  }
138 
139  HeaderView head(&headerBuf_[0]);
140  uint32 code = head.code();
141  if (code != Header::INIT)
142  {
143  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
144  << "Expecting an init Message at start of file\n";
145  return;
146  }
147 
148  uint32 headerSize = head.size();
149  if (headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
150 
151  if (headerSize > sizeof(HeaderView)) {
152  nWant = headerSize - sizeof(HeaderView);
153  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
154  if (nGot != nWant) {
155  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
156  << "Failed reading streamer file, second read in readStartMessage\n";
157  }
158  }
159  else {
160  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
161  << "Failed reading streamer file, init header size from data too small\n";
162  }
163 
164  startMsg_.reset(new InitMsgView(&headerBuf_[0]));
165  }
unsigned int uint32
Definition: MsgTools.h:13
boost::shared_ptr< InitMsgView > startMsg_
IOSize readBytes(char *buf, IOSize nBytes)
std::vector< char > headerBuf_
size_t IOSize
Definition: IOTypes.h:14
IOOffset edm::StreamerInputFile::skipBytes ( IOSize  nBytes)
private

Definition at line 115 of file StreamerInputFile.cc.

References Storage::CURRENT, edm::hlt::Exception, cms::Exception::explainSelf(), edm::errors::FileReadError, n, and storage_.

Referenced by readEventMessage().

115  {
116  IOOffset n = 0;
117  try {
118  // We wish to return the number of bytes skipped, not the final offset.
119  n = storage_->position(0, Storage::CURRENT);
120  n = storage_->position(nBytes, Storage::CURRENT) - n;
121  }
122  catch (cms::Exception& ce) {
123  throw Exception(errors::FileReadError, "StreamerInputFile::skipBytes")
124  << "Failed reading streamer file in function skipBytes\n"
125  << ce.explainSelf() << "\n";
126  }
127  return n;
128  }
virtual std::string explainSelf() const
Definition: Exception.cc:56
boost::shared_ptr< Storage > storage_
int64_t IOOffset
Definition: IOTypes.h:19
InitMsgView const* edm::StreamerInputFile::startMessage ( ) const
inline

Moves the handler to next Event Record

Definition at line 34 of file StreamerInputFile.h.

References startMsg_.

Referenced by WatcherStreamFileReader::getHeader().

34 { return startMsg_.get(); }
boost::shared_ptr< InitMsgView > startMsg_

Member Data Documentation

boost::shared_ptr<EventMsgView> edm::StreamerInputFile::currentEvMsg_
private

Definition at line 60 of file StreamerInputFile.h.

Referenced by currentRecord(), and readEventMessage().

unsigned int edm::StreamerInputFile::currentFile_
private

Buffer to store Event Data

Definition at line 65 of file StreamerInputFile.h.

Referenced by compareHeader(), openNextFile(), and StreamerInputFile().

std::string edm::StreamerInputFile::currentFileName_
private

True if Multiple Streams are Read

Definition at line 68 of file StreamerInputFile.h.

Referenced by logFileAction(), and openStreamerFile().

bool edm::StreamerInputFile::currentFileOpen_
private

Definition at line 69 of file StreamerInputFile.h.

Referenced by openStreamerFile(), and ~StreamerInputFile().

uint32 edm::StreamerInputFile::currProto_
private

Definition at line 76 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

uint32 edm::StreamerInputFile::currRun_
private

Definition at line 75 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

bool edm::StreamerInputFile::endOfFile_
private

Definition at line 82 of file StreamerInputFile.h.

Referenced by next(), and readEventMessage().

std::vector<char> edm::StreamerInputFile::eventBuf_
private

Buffer to store file Header

Definition at line 63 of file StreamerInputFile.h.

Referenced by readEventMessage().

boost::shared_ptr<EventSkipperByID> edm::StreamerInputFile::eventSkipperByID_
private

Definition at line 71 of file StreamerInputFile.h.

Referenced by readEventMessage().

std::vector<char> edm::StreamerInputFile::headerBuf_
private

Definition at line 62 of file StreamerInputFile.h.

Referenced by readStartMessage().

bool edm::StreamerInputFile::multiStreams_
private

names of Streamer files

Definition at line 67 of file StreamerInputFile.h.

Referenced by next().

bool edm::StreamerInputFile::newHeader_
private

Definition at line 78 of file StreamerInputFile.h.

Referenced by compareHeader(), and newHeader().

int* edm::StreamerInputFile::numberOfEventsToSkip_
private

Definition at line 73 of file StreamerInputFile.h.

Referenced by readEventMessage().

boost::shared_ptr<InitMsgView> edm::StreamerInputFile::startMsg_
private
boost::shared_ptr<Storage> edm::StreamerInputFile::storage_
private

Definition at line 80 of file StreamerInputFile.h.

Referenced by openStreamerFile(), readBytes(), skipBytes(), and ~StreamerInputFile().

std::vector<std::string> edm::StreamerInputFile::streamerNames_
private

keeps track of which file is in use at the moment

Definition at line 66 of file StreamerInputFile.h.

Referenced by compareHeader(), and openNextFile().