CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputFile Class Reference

#include <StreamerInputFile.h>

Public Member Functions

void closeStreamerFile ()
 Needs to be public because of forking. More...
 
EventMsgView const * currentRecord () const
 
bool newHeader ()
 
bool next ()
 
InitMsgView const * startMessage () const
 
 StreamerInputFile (std::string const &name, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
 
 StreamerInputFile (std::vector< std::string > const &names, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
 
 ~StreamerInputFile ()
 

Private Member Functions

bool compareHeader ()
 
void logFileAction (char const *msg)
 
bool openNextFile ()
 
void openStreamerFile (std::string const &name)
 
IOSize readBytes (char *buf, IOSize nBytes)
 
int readEventMessage ()
 
void readStartMessage ()
 
IOOffset skipBytes (IOSize nBytes)
 

Private Attributes

edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
 
unsigned int currentFile_
 
std::string currentFileName_
 
bool currentFileOpen_
 
uint32 currProto_
 
uint32 currRun_
 
bool endOfFile_
 
std::vector< char > eventBuf_
 
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
 
std::vector< char > headerBuf_
 
bool multiStreams_
 
bool newHeader_
 
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
 
edm::propagate_const< std::unique_ptr< Storage > > storage_
 
std::vector< std::string > streamerNames_
 

Detailed Description

Definition at line 18 of file StreamerInputFile.h.

Constructor & Destructor Documentation

edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
std::shared_ptr< EventSkipperByID eventSkipperByID = std::shared_ptr<EventSkipperByID>() 
)
explicit

Reads a Streamer file

Definition at line 20 of file StreamerInputFile.cc.

References openStreamerFile(), and readStartMessage().

21  : startMsg_(),
22  currentEvMsg_(),
23  headerBuf_(1000 * 1000),
24  eventBuf_(1000 * 1000 * 7),
25  currentFile_(0),
27  multiStreams_(false),
29  currentFileOpen_(false),
30  eventSkipperByID_(eventSkipperByID),
31  currRun_(0),
32  currProto_(0),
33  newHeader_(false),
34  storage_(),
35  endOfFile_(false) {
38  }
std::vector< char > eventBuf_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
std::vector< std::string > streamerNames_
std::vector< char > headerBuf_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
void openStreamerFile(std::string const &name)
edm::propagate_const< std::unique_ptr< Storage > > storage_
edm::StreamerInputFile::StreamerInputFile ( std::vector< std::string > const &  names,
std::shared_ptr< EventSkipperByID eventSkipperByID = std::shared_ptr<EventSkipperByID>() 
)
explicit

Multiple Streamer files

Definition at line 40 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.

42  : startMsg_(),
43  currentEvMsg_(),
44  headerBuf_(1000 * 1000),
45  eventBuf_(1000 * 1000 * 7),
46  currentFile_(0),
48  multiStreams_(true),
50  currentFileOpen_(false),
51  eventSkipperByID_(eventSkipperByID),
52  currRun_(0),
53  currProto_(0),
54  newHeader_(false),
55  endOfFile_(false) {
56  openStreamerFile(names.at(0));
57  ++currentFile_;
59  currRun_ = startMsg_->run();
60  currProto_ = startMsg_->protocolVersion();
61  }
std::vector< char > eventBuf_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
const std::string names[nVars_]
std::vector< std::string > streamerNames_
std::vector< char > headerBuf_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
void openStreamerFile(std::string const &name)
edm::StreamerInputFile::~StreamerInputFile ( )

Definition at line 18 of file StreamerInputFile.cc.

References closeStreamerFile().

18 { closeStreamerFile(); }
void closeStreamerFile()
Needs to be public because of forking.

Member Function Documentation

void edm::StreamerInputFile::closeStreamerFile ( )

Needs to be public because of forking.

Test bit if a new header is encountered

Definition at line 88 of file StreamerInputFile.cc.

References currentFileOpen_, logFileAction(), and storage_.

Referenced by newHeader(), openStreamerFile(), and ~StreamerInputFile().

88  {
89  if (currentFileOpen_ && storage_) {
90  storage_->close();
91  logFileAction(" Closed file ");
92  }
93  currentFileOpen_ = false;
94  }
void logFileAction(char const *msg)
edm::propagate_const< std::unique_ptr< Storage > > storage_
bool edm::StreamerInputFile::compareHeader ( )
private

Compares current File header with the newly opened file header Returns false in case of mismatch

Definition at line 194 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.

Referenced by newHeader(), and openNextFile().

194  {
195  //Get the new header
197 
198  //Values from new Header should match up
199  if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
200  throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
201  << "File " << streamerNames_.at(currentFile_)
202  << "\nhas different run number or protocol version than previous\n";
203  return false;
204  }
205  newHeader_ = true;
206  return true;
207  }
std::vector< std::string > streamerNames_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
EventMsgView const* edm::StreamerInputFile::currentRecord ( ) const
inline

Points to File Start Header/Message

Definition at line 35 of file StreamerInputFile.h.

References currentEvMsg_, and edm::propagate_const< T >::get().

Referenced by WatcherStreamFileReader::getNextEvent().

35 { return currentEvMsg_.get(); }
element_type const * get() const
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
void edm::StreamerInputFile::logFileAction ( char const *  msg)
private

Definition at line 271 of file StreamerInputFile.cc.

References currentFileName_, and edm::FlushMessageLog().

Referenced by closeStreamerFile(), newHeader(), and openStreamerFile().

271  {
272  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
273  FlushMessageLog();
274  }
void FlushMessageLog()
tuple msg
Definition: mps_check.py:285
bool edm::StreamerInputFile::newHeader ( )
inline

Points to current Record

Definition at line 38 of file StreamerInputFile.h.

References closeStreamerFile(), compareHeader(), logFileAction(), mps_check::msg, newHeader_, openNextFile(), openStreamerFile(), readBytes(), readEventMessage(), readStartMessage(), skipBytes(), AlCaHLTBitMon_QueryRunRegistry::string, and tmp.

Referenced by WatcherStreamFileReader::newHeader().

38  {
39  bool tmp = newHeader_;
40  newHeader_ = false;
41  return tmp;
42  }
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
bool edm::StreamerInputFile::next ( void  )

Definition at line 158 of file StreamerInputFile.cc.

References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().

Referenced by WatcherStreamFileReader::getNextEvent().

158  {
159  if (this->readEventMessage()) {
160  return true;
161  }
162  if (multiStreams_) {
163  //Try opening next file
164  if (openNextFile()) {
165  endOfFile_ = false;
166  if (this->readEventMessage()) {
167  return true;
168  }
169  }
170  }
171  return false;
172  }
bool edm::StreamerInputFile::openNextFile ( )
private

Definition at line 174 of file StreamerInputFile.cc.

References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.

Referenced by newHeader(), and next().

174  {
175  if (currentFile_ <= streamerNames_.size() - 1) {
176  FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).c_str() << std::endl;
177 
179 
180  // If start message was already there, then compare the
181  // previous and new headers
182  if (startMsg_) {
183  FDEBUG(10) << "Comparing Header" << std::endl;
184  if (!compareHeader()) {
185  return false;
186  }
187  }
188  ++currentFile_;
189  return true;
190  }
191  return false;
192  }
std::vector< std::string > streamerNames_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
void openStreamerFile(std::string const &name)
void edm::StreamerInputFile::openStreamerFile ( std::string const &  name)
private

Definition at line 63 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::check(), cms::Exception::clearMessage(), closeStreamerFile(), currentFileName_, currentFileOpen_, MillePedeFileConverter_cfg::e, Exception, edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), dataset::name, StorageFactory::open(), IOFlags::OpenRead, findQualityFiles::size, and storage_.

Referenced by newHeader(), openNextFile(), and StreamerInputFile().

63  {
65 
67  logFileAction(" Initiating request to open file ");
68 
69  IOOffset size = -1;
70  if (StorageFactory::get()->check(name, &size)) {
71  try {
73  } catch (cms::Exception& e) {
75  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
76  ex.clearMessage();
77  ex << "Error Opening Streamer Input File: " << name << "\n";
78  throw ex;
79  }
80  } else {
81  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
82  << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
83  }
84  currentFileOpen_ = true;
85  logFileAction(" Successfully opened file ");
86  }
size
Write out results.
void closeStreamerFile()
Needs to be public because of forking.
static const StorageFactory * get(void)
void logFileAction(char const *msg)
int64_t IOOffset
Definition: IOTypes.h:19
static void check(T const &p, std::string const &id, SelectedProducts const &iProducts)
edm::propagate_const< std::unique_ptr< Storage > > storage_
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
IOSize edm::StreamerInputFile::readBytes ( char *  buf,
IOSize  nBytes 
)
private

Definition at line 96 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::errors::FileReadError, gen::n, and storage_.

Referenced by newHeader(), readEventMessage(), and readStartMessage().

96  {
97  IOSize n = 0;
98  try {
99  n = storage_->read(buf, nBytes);
100  } catch (cms::Exception& ce) {
101  Exception ex(errors::FileReadError, "", ce);
102  ex.addContext("Calling StreamerInputFile::readBytes()");
103  throw ex;
104  }
105  return n;
106  }
size_t IOSize
Definition: IOTypes.h:14
edm::propagate_const< std::unique_ptr< Storage > > storage_
int edm::StreamerInputFile::readEventMessage ( )
private

Definition at line 209 of file StreamerInputFile.cc.

References HeaderView::code(), convert32(), convert64(), currentEvMsg_, endOfFile_, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, Exception, edm::errors::FileReadError, EventHeader::lumi_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().

Referenced by newHeader(), and next().

209  {
210  if (endOfFile_)
211  return 0;
212 
213  bool eventRead = false;
214  while (!eventRead) {
215  IOSize nWant = sizeof(EventHeader);
216  IOSize nGot = readBytes(&eventBuf_[0], nWant);
217  if (nGot == 0) {
218  // no more data available
219  endOfFile_ = true;
220  return 0;
221  }
222  if (nGot != nWant) {
223  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
224  << "Failed reading streamer file, first read in readEventMessage\n"
225  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
226  }
227  HeaderView head(&eventBuf_[0]);
228  uint32 code = head.code();
229 
230  // If it is not an event then something is wrong.
231  if (code != Header::EVENT) {
232  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
233  << "Failed reading streamer file, unknown code in event header\n"
234  << "code = " << code << "\n";
235  }
236  uint32 eventSize = head.size();
237  if (eventSize <= sizeof(EventHeader)) {
238  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
239  << "Failed reading streamer file, event header size from data too small\n";
240  }
241  eventRead = true;
242  if (eventSkipperByID_) {
243  EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
244  if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
245  eventRead = false;
246  }
247  }
248  nWant = eventSize - sizeof(EventHeader);
249  if (eventRead) {
250  if (eventBuf_.size() < eventSize)
251  eventBuf_.resize(eventSize);
252  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
253  if (nGot != nWant) {
254  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
255  << "Failed reading streamer file, second read in readEventMessage\n"
256  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
257  }
258  } else {
259  nGot = skipBytes(nWant);
260  if (nGot != nWant) {
261  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
262  << "Failed reading streamer file, skip event in readEventMessage\n"
263  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
264  }
265  }
266  }
267  currentEvMsg_ = std::make_shared<EventMsgView>((void*)&eventBuf_[0]); // propagate_const<T> has no reset() function
268  return 1;
269  }
uint64 convert64(char_uint64 v)
Definition: MsgTools.h:20
std::vector< char > eventBuf_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:66
IOOffset skipBytes(IOSize nBytes)
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:64
char_uint64 event_
Definition: EventMessage.h:65
IOSize readBytes(char *buf, IOSize nBytes)
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:28
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
size_t IOSize
Definition: IOTypes.h:14
void edm::StreamerInputFile::readStartMessage ( )
private

Not an init message should return

Definition at line 122 of file StreamerInputFile.cc.

References HeaderView::code(), Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.

Referenced by compareHeader(), newHeader(), and StreamerInputFile().

122  {
123  IOSize nWant = sizeof(HeaderView);
124  IOSize nGot = readBytes(&headerBuf_[0], nWant);
125  if (nGot != nWant) {
126  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
127  << "Failed reading streamer file, first read in readStartMessage\n";
128  }
129 
130  HeaderView head(&headerBuf_[0]);
131  uint32 code = head.code();
132  if (code != Header::INIT)
133  {
134  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
135  << "Expecting an init Message at start of file\n";
136  return;
137  }
138 
139  uint32 headerSize = head.size();
140  if (headerBuf_.size() < headerSize)
141  headerBuf_.resize(headerSize);
142 
143  if (headerSize > sizeof(HeaderView)) {
144  nWant = headerSize - sizeof(HeaderView);
145  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
146  if (nGot != nWant) {
147  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
148  << "Failed reading streamer file, second read in readStartMessage\n";
149  }
150  } else {
151  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
152  << "Failed reading streamer file, init header size from data too small\n";
153  }
154 
155  startMsg_ = std::make_shared<InitMsgView>(&headerBuf_[0]); // propagate_const<T> has no reset() function
156  }
unsigned int uint32
Definition: MsgTools.h:13
IOSize readBytes(char *buf, IOSize nBytes)
std::vector< char > headerBuf_
size_t IOSize
Definition: IOTypes.h:14
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
IOOffset edm::StreamerInputFile::skipBytes ( IOSize  nBytes)
private

Definition at line 108 of file StreamerInputFile.cc.

References cms::Exception::addContext(), Storage::CURRENT, edm::errors::FileReadError, gen::n, and storage_.

Referenced by newHeader(), and readEventMessage().

108  {
109  IOOffset n = 0;
110  try {
111  // We wish to return the number of bytes skipped, not the final offset.
112  n = storage_->position(0, Storage::CURRENT);
113  n = storage_->position(nBytes, Storage::CURRENT) - n;
114  } catch (cms::Exception& ce) {
115  Exception ex(errors::FileReadError, "", ce);
116  ex.addContext("Calling StreamerInputFile::skipBytes()");
117  throw ex;
118  }
119  return n;
120  }
int64_t IOOffset
Definition: IOTypes.h:19
edm::propagate_const< std::unique_ptr< Storage > > storage_
InitMsgView const* edm::StreamerInputFile::startMessage ( ) const
inline

Moves the handler to next Event Record

Definition at line 32 of file StreamerInputFile.h.

References edm::propagate_const< T >::get(), and startMsg_.

Referenced by WatcherStreamFileReader::getHeader().

32 { return startMsg_.get(); }
element_type const * get() const
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_

Member Data Documentation

edm::propagate_const<std::shared_ptr<EventMsgView> > edm::StreamerInputFile::currentEvMsg_
private

Definition at line 63 of file StreamerInputFile.h.

Referenced by currentRecord(), and readEventMessage().

unsigned int edm::StreamerInputFile::currentFile_
private

Buffer to store Event Data

Definition at line 68 of file StreamerInputFile.h.

Referenced by compareHeader(), openNextFile(), and StreamerInputFile().

std::string edm::StreamerInputFile::currentFileName_
private

True if Multiple Streams are Read

Definition at line 71 of file StreamerInputFile.h.

Referenced by logFileAction(), and openStreamerFile().

bool edm::StreamerInputFile::currentFileOpen_
private

Definition at line 72 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), and openStreamerFile().

uint32 edm::StreamerInputFile::currProto_
private

Definition at line 77 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

uint32 edm::StreamerInputFile::currRun_
private

Definition at line 76 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

bool edm::StreamerInputFile::endOfFile_
private

Definition at line 83 of file StreamerInputFile.h.

Referenced by next(), and readEventMessage().

std::vector<char> edm::StreamerInputFile::eventBuf_
private

Buffer to store file Header

Definition at line 66 of file StreamerInputFile.h.

Referenced by readEventMessage().

edm::propagate_const<std::shared_ptr<EventSkipperByID> > edm::StreamerInputFile::eventSkipperByID_
private

Definition at line 74 of file StreamerInputFile.h.

Referenced by readEventMessage().

std::vector<char> edm::StreamerInputFile::headerBuf_
private

Definition at line 65 of file StreamerInputFile.h.

Referenced by readStartMessage().

bool edm::StreamerInputFile::multiStreams_
private

names of Streamer files

Definition at line 70 of file StreamerInputFile.h.

Referenced by next().

bool edm::StreamerInputFile::newHeader_
private

Definition at line 79 of file StreamerInputFile.h.

Referenced by compareHeader(), and newHeader().

edm::propagate_const<std::shared_ptr<InitMsgView> > edm::StreamerInputFile::startMsg_
private
edm::propagate_const<std::unique_ptr<Storage> > edm::StreamerInputFile::storage_
private

Definition at line 81 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), openStreamerFile(), readBytes(), and skipBytes().

std::vector<std::string> edm::StreamerInputFile::streamerNames_
private

keeps track of which file is in use at the moment

Definition at line 69 of file StreamerInputFile.h.

Referenced by compareHeader(), and openNextFile().