CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputFile Class Reference

#include <StreamerInputFile.h>

Public Member Functions

void closeStreamerFile ()
 Needs to be public because of forking. More...
 
EventMsgView const * currentRecord () const
 
bool newHeader ()
 
bool next ()
 
InitMsgView const * startMessage () const
 
 StreamerInputFile (std::string const &name, std::string const &LFN, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
 
 StreamerInputFile (std::string const &name, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
 
 StreamerInputFile (std::vector< FileCatalogItem > const &names, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
 
 ~StreamerInputFile ()
 

Private Member Functions

bool compareHeader ()
 
void logFileAction (char const *msg)
 
bool openNextFile ()
 
void openStreamerFile (std::string const &name, std::string const &LFN)
 
IOSize readBytes (char *buf, IOSize nBytes)
 
int readEventMessage ()
 
void readStartMessage ()
 
IOOffset skipBytes (IOSize nBytes)
 

Private Attributes

edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
 
unsigned int currentFile_
 
std::string currentFileName_
 
bool currentFileOpen_
 
uint32 currProto_
 
uint32 currRun_
 
bool endOfFile_
 
std::vector< char > eventBuf_
 
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
 
std::vector< char > headerBuf_
 
bool multiStreams_
 
bool newHeader_
 
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
 
edm::propagate_const< std::unique_ptr< Storage > > storage_
 
std::vector< FileCatalogItemstreamerNames_
 

Detailed Description

Definition at line 19 of file StreamerInputFile.h.

Constructor & Destructor Documentation

edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
std::string const &  LFN,
std::shared_ptr< EventSkipperByID eventSkipperByID = std::shared_ptr<EventSkipperByID>() 
)
explicit

Reads a Streamer file

Definition at line 21 of file StreamerInputFile.cc.

References openStreamerFile(), and readStartMessage().

24  : startMsg_(),
25  currentEvMsg_(),
26  headerBuf_(1000 * 1000),
27  eventBuf_(1000 * 1000 * 7),
28  currentFile_(0),
30  multiStreams_(false),
32  currentFileOpen_(false),
33  eventSkipperByID_(eventSkipperByID),
34  currRun_(0),
35  currProto_(0),
36  newHeader_(false),
37  storage_(),
38  endOfFile_(false) {
39  openStreamerFile(name, LFN);
41  }
std::vector< char > eventBuf_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
void openStreamerFile(std::string const &name, std::string const &LFN)
std::vector< char > headerBuf_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
std::vector< FileCatalogItem > streamerNames_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
edm::propagate_const< std::unique_ptr< Storage > > storage_
edm::StreamerInputFile::StreamerInputFile ( std::string const &  name,
std::shared_ptr< EventSkipperByID eventSkipperByID = std::shared_ptr<EventSkipperByID>() 
)
explicit

Definition at line 43 of file StreamerInputFile.cc.

44  : StreamerInputFile(name, name, eventSkipperByID) {}
StreamerInputFile(std::string const &name, std::string const &LFN, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
edm::StreamerInputFile::StreamerInputFile ( std::vector< FileCatalogItem > const &  names,
std::shared_ptr< EventSkipperByID eventSkipperByID = std::shared_ptr<EventSkipperByID>() 
)
explicit

Multiple Streamer files

Definition at line 46 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, openStreamerFile(), readStartMessage(), and startMsg_.

48  : startMsg_(),
49  currentEvMsg_(),
50  headerBuf_(1000 * 1000),
51  eventBuf_(1000 * 1000 * 7),
52  currentFile_(0),
54  multiStreams_(true),
56  currentFileOpen_(false),
57  eventSkipperByID_(eventSkipperByID),
58  currRun_(0),
59  currProto_(0),
60  newHeader_(false),
61  endOfFile_(false) {
62  openStreamerFile(names.at(0).fileName(), names.at(0).logicalFileName());
63  ++currentFile_;
65  currRun_ = startMsg_->run();
66  currProto_ = startMsg_->protocolVersion();
67  }
std::vector< char > eventBuf_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
const std::string names[nVars_]
void openStreamerFile(std::string const &name, std::string const &LFN)
std::vector< char > headerBuf_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
std::vector< FileCatalogItem > streamerNames_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
edm::StreamerInputFile::~StreamerInputFile ( )

Definition at line 19 of file StreamerInputFile.cc.

References closeStreamerFile().

19 { closeStreamerFile(); }
void closeStreamerFile()
Needs to be public because of forking.

Member Function Documentation

void edm::StreamerInputFile::closeStreamerFile ( )

Needs to be public because of forking.

Test bit if a new header is encountered

Definition at line 105 of file StreamerInputFile.cc.

References currentFileOpen_, logFileAction(), and storage_.

Referenced by newHeader(), openStreamerFile(), and ~StreamerInputFile().

105  {
106  if (currentFileOpen_ && storage_) {
107  storage_->close();
108  logFileAction(" Closed file ");
109  }
110  currentFileOpen_ = false;
111  }
void logFileAction(char const *msg)
edm::propagate_const< std::unique_ptr< Storage > > storage_
bool edm::StreamerInputFile::compareHeader ( )
private

Compares current File header with the newly opened file header Returns false in case of mismatch

Definition at line 211 of file StreamerInputFile.cc.

References currentFile_, currProto_, currRun_, Exception, edm::errors::MismatchedInputFiles, newHeader_, readStartMessage(), startMsg_, and streamerNames_.

Referenced by newHeader(), and openNextFile().

211  {
212  //Get the new header
214 
215  //Values from new Header should match up
216  if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
217  throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
218  << "File " << streamerNames_.at(currentFile_).fileName()
219  << "\nhas different run number or protocol version than previous\n";
220  return false;
221  }
222  newHeader_ = true;
223  return true;
224  }
std::vector< FileCatalogItem > streamerNames_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
EventMsgView const* edm::StreamerInputFile::currentRecord ( ) const
inline

Points to File Start Header/Message

Definition at line 39 of file StreamerInputFile.h.

References currentEvMsg_, and edm::propagate_const< T >::get().

Referenced by WatcherStreamFileReader::getNextEvent().

39 { return currentEvMsg_.get(); }
element_type const * get() const
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
void edm::StreamerInputFile::logFileAction ( char const *  msg)
private

Definition at line 288 of file StreamerInputFile.cc.

References currentFileName_, and edm::FlushMessageLog().

Referenced by closeStreamerFile(), newHeader(), and openStreamerFile().

288  {
289  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
290  FlushMessageLog();
291  }
void FlushMessageLog()
tuple msg
Definition: mps_check.py:285
bool edm::StreamerInputFile::newHeader ( )
inline
bool edm::StreamerInputFile::next ( void  )

Definition at line 175 of file StreamerInputFile.cc.

References endOfFile_, multiStreams_, openNextFile(), and readEventMessage().

Referenced by WatcherStreamFileReader::getNextEvent().

175  {
176  if (this->readEventMessage()) {
177  return true;
178  }
179  if (multiStreams_) {
180  //Try opening next file
181  if (openNextFile()) {
182  endOfFile_ = false;
183  if (this->readEventMessage()) {
184  return true;
185  }
186  }
187  }
188  return false;
189  }
bool edm::StreamerInputFile::openNextFile ( )
private

Definition at line 191 of file StreamerInputFile.cc.

References compareHeader(), currentFile_, FDEBUG, openStreamerFile(), startMsg_, and streamerNames_.

Referenced by newHeader(), and next().

191  {
192  if (currentFile_ <= streamerNames_.size() - 1) {
193  FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileName().c_str() << std::endl;
194 
195  openStreamerFile(streamerNames_.at(currentFile_).fileName(), streamerNames_.at(currentFile_).logicalFileName());
196 
197  // If start message was already there, then compare the
198  // previous and new headers
199  if (startMsg_) {
200  FDEBUG(10) << "Comparing Header" << std::endl;
201  if (!compareHeader()) {
202  return false;
203  }
204  }
205  ++currentFile_;
206  return true;
207  }
208  return false;
209  }
void openStreamerFile(std::string const &name, std::string const &LFN)
std::vector< FileCatalogItem > streamerNames_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
void edm::StreamerInputFile::openStreamerFile ( std::string const &  name,
std::string const &  LFN 
)
private

Definition at line 69 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::check(), cms::Exception::clearMessage(), closeStreamerFile(), currentFileName_, currentFileOpen_, MillePedeFileConverter_cfg::e, Exception, edm::errors::FileOpenError, StorageFactory::get(), logFileAction(), Skims_PA_cff::name, StorageFactory::open(), IOFlags::OpenRead, findQualityFiles::size, and storage_.

Referenced by newHeader(), openNextFile(), and StreamerInputFile().

69  {
71 
73 
74  // Check if the logical file name was found.
75  if (currentFileName_.empty()) {
76  // LFN not found in catalog.
77  throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
78  << "Logical file name '" << LFN << "' was not found in the file catalog.\n"
79  << "If you wanted a local file, you forgot the 'file:' prefix\n"
80  << "before the file name in your configuration file.\n";
81  return;
82  }
83 
84  logFileAction(" Initiating request to open file ");
85 
86  IOOffset size = -1;
87  if (StorageFactory::get()->check(name, &size)) {
88  try {
90  } catch (cms::Exception& e) {
92  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
93  ex.clearMessage();
94  ex << "Error Opening Streamer Input File: " << name << "\n";
95  throw ex;
96  }
97  } else {
98  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
99  << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
100  }
101  currentFileOpen_ = true;
102  logFileAction(" Successfully opened file ");
103  }
size
Write out results.
void closeStreamerFile()
Needs to be public because of forking.
static const StorageFactory * get(void)
void logFileAction(char const *msg)
int64_t IOOffset
Definition: IOTypes.h:19
static void check(T const &p, std::string const &id, SelectedProducts const &iProducts)
edm::propagate_const< std::unique_ptr< Storage > > storage_
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
IOSize edm::StreamerInputFile::readBytes ( char *  buf,
IOSize  nBytes 
)
private

Definition at line 113 of file StreamerInputFile.cc.

References cms::Exception::addContext(), edm::errors::FileReadError, dqmiodumpmetadata::n, and storage_.

Referenced by newHeader(), readEventMessage(), and readStartMessage().

113  {
114  IOSize n = 0;
115  try {
116  n = storage_->read(buf, nBytes);
117  } catch (cms::Exception& ce) {
118  Exception ex(errors::FileReadError, "", ce);
119  ex.addContext("Calling StreamerInputFile::readBytes()");
120  throw ex;
121  }
122  return n;
123  }
size_t IOSize
Definition: IOTypes.h:14
edm::propagate_const< std::unique_ptr< Storage > > storage_
int edm::StreamerInputFile::readEventMessage ( )
private

Definition at line 226 of file StreamerInputFile.cc.

References HeaderView::code(), convert32(), convert64(), currentEvMsg_, endOfFile_, Header::EVENT, EventHeader::event_, eventBuf_, eventSkipperByID_, Exception, edm::errors::FileReadError, EventHeader::lumi_, readBytes(), EventHeader::run_, HeaderView::size(), and skipBytes().

Referenced by newHeader(), and next().

226  {
227  if (endOfFile_)
228  return 0;
229 
230  bool eventRead = false;
231  while (!eventRead) {
232  IOSize nWant = sizeof(EventHeader);
233  IOSize nGot = readBytes(&eventBuf_[0], nWant);
234  if (nGot == 0) {
235  // no more data available
236  endOfFile_ = true;
237  return 0;
238  }
239  if (nGot != nWant) {
240  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
241  << "Failed reading streamer file, first read in readEventMessage\n"
242  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
243  }
244  HeaderView head(&eventBuf_[0]);
245  uint32 code = head.code();
246 
247  // If it is not an event then something is wrong.
248  if (code != Header::EVENT) {
249  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
250  << "Failed reading streamer file, unknown code in event header\n"
251  << "code = " << code << "\n";
252  }
253  uint32 eventSize = head.size();
254  if (eventSize <= sizeof(EventHeader)) {
255  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
256  << "Failed reading streamer file, event header size from data too small\n";
257  }
258  eventRead = true;
259  if (eventSkipperByID_) {
260  EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
261  if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
262  eventRead = false;
263  }
264  }
265  nWant = eventSize - sizeof(EventHeader);
266  if (eventRead) {
267  if (eventBuf_.size() < eventSize)
268  eventBuf_.resize(eventSize);
269  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
270  if (nGot != nWant) {
271  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
272  << "Failed reading streamer file, second read in readEventMessage\n"
273  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
274  }
275  } else {
276  nGot = skipBytes(nWant);
277  if (nGot != nWant) {
278  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
279  << "Failed reading streamer file, skip event in readEventMessage\n"
280  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
281  }
282  }
283  }
284  currentEvMsg_ = std::make_shared<EventMsgView>((void*)&eventBuf_[0]); // propagate_const<T> has no reset() function
285  return 1;
286  }
uint64 convert64(char_uint64 v)
Definition: MsgTools.h:20
std::vector< char > eventBuf_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:66
IOOffset skipBytes(IOSize nBytes)
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:64
char_uint64 event_
Definition: EventMessage.h:65
IOSize readBytes(char *buf, IOSize nBytes)
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:28
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
size_t IOSize
Definition: IOTypes.h:14
void edm::StreamerInputFile::readStartMessage ( )
private

Not an init message should return

Definition at line 139 of file StreamerInputFile.cc.

References HeaderView::code(), Exception, edm::errors::FileReadError, headerBuf_, Header::INIT, readBytes(), HeaderView::size(), and startMsg_.

Referenced by compareHeader(), newHeader(), and StreamerInputFile().

139  {
140  IOSize nWant = sizeof(HeaderView);
141  IOSize nGot = readBytes(&headerBuf_[0], nWant);
142  if (nGot != nWant) {
143  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
144  << "Failed reading streamer file, first read in readStartMessage\n";
145  }
146 
147  HeaderView head(&headerBuf_[0]);
148  uint32 code = head.code();
149  if (code != Header::INIT)
150  {
151  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
152  << "Expecting an init Message at start of file\n";
153  return;
154  }
155 
156  uint32 headerSize = head.size();
157  if (headerBuf_.size() < headerSize)
158  headerBuf_.resize(headerSize);
159 
160  if (headerSize > sizeof(HeaderView)) {
161  nWant = headerSize - sizeof(HeaderView);
162  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
163  if (nGot != nWant) {
164  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
165  << "Failed reading streamer file, second read in readStartMessage\n";
166  }
167  } else {
168  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
169  << "Failed reading streamer file, init header size from data too small\n";
170  }
171 
172  startMsg_ = std::make_shared<InitMsgView>(&headerBuf_[0]); // propagate_const<T> has no reset() function
173  }
unsigned int uint32
Definition: MsgTools.h:13
IOSize readBytes(char *buf, IOSize nBytes)
std::vector< char > headerBuf_
size_t IOSize
Definition: IOTypes.h:14
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
IOOffset edm::StreamerInputFile::skipBytes ( IOSize  nBytes)
private

Definition at line 125 of file StreamerInputFile.cc.

References cms::Exception::addContext(), Storage::CURRENT, edm::errors::FileReadError, dqmiodumpmetadata::n, and storage_.

Referenced by newHeader(), and readEventMessage().

125  {
126  IOOffset n = 0;
127  try {
128  // We wish to return the number of bytes skipped, not the final offset.
129  n = storage_->position(0, Storage::CURRENT);
130  n = storage_->position(nBytes, Storage::CURRENT) - n;
131  } catch (cms::Exception& ce) {
132  Exception ex(errors::FileReadError, "", ce);
133  ex.addContext("Calling StreamerInputFile::skipBytes()");
134  throw ex;
135  }
136  return n;
137  }
int64_t IOOffset
Definition: IOTypes.h:19
edm::propagate_const< std::unique_ptr< Storage > > storage_
InitMsgView const* edm::StreamerInputFile::startMessage ( ) const
inline

Moves the handler to next Event Record

Definition at line 36 of file StreamerInputFile.h.

References edm::propagate_const< T >::get(), and startMsg_.

Referenced by WatcherStreamFileReader::getHeader().

36 { return startMsg_.get(); }
element_type const * get() const
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_

Member Data Documentation

edm::propagate_const<std::shared_ptr<EventMsgView> > edm::StreamerInputFile::currentEvMsg_
private

Definition at line 67 of file StreamerInputFile.h.

Referenced by currentRecord(), and readEventMessage().

unsigned int edm::StreamerInputFile::currentFile_
private

Buffer to store Event Data

Definition at line 72 of file StreamerInputFile.h.

Referenced by compareHeader(), openNextFile(), and StreamerInputFile().

std::string edm::StreamerInputFile::currentFileName_
private

True if Multiple Streams are Read

Definition at line 75 of file StreamerInputFile.h.

Referenced by logFileAction(), and openStreamerFile().

bool edm::StreamerInputFile::currentFileOpen_
private

Definition at line 76 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), and openStreamerFile().

uint32 edm::StreamerInputFile::currProto_
private

Definition at line 81 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

uint32 edm::StreamerInputFile::currRun_
private

Definition at line 80 of file StreamerInputFile.h.

Referenced by compareHeader(), and StreamerInputFile().

bool edm::StreamerInputFile::endOfFile_
private

Definition at line 87 of file StreamerInputFile.h.

Referenced by next(), and readEventMessage().

std::vector<char> edm::StreamerInputFile::eventBuf_
private

Buffer to store file Header

Definition at line 70 of file StreamerInputFile.h.

Referenced by readEventMessage().

edm::propagate_const<std::shared_ptr<EventSkipperByID> > edm::StreamerInputFile::eventSkipperByID_
private

Definition at line 78 of file StreamerInputFile.h.

Referenced by readEventMessage().

std::vector<char> edm::StreamerInputFile::headerBuf_
private

Definition at line 69 of file StreamerInputFile.h.

Referenced by readStartMessage().

bool edm::StreamerInputFile::multiStreams_
private

names of Streamer files

Definition at line 74 of file StreamerInputFile.h.

Referenced by next().

bool edm::StreamerInputFile::newHeader_
private

Definition at line 83 of file StreamerInputFile.h.

Referenced by compareHeader(), and newHeader().

edm::propagate_const<std::shared_ptr<InitMsgView> > edm::StreamerInputFile::startMsg_
private
edm::propagate_const<std::unique_ptr<Storage> > edm::StreamerInputFile::storage_
private

Definition at line 85 of file StreamerInputFile.h.

Referenced by closeStreamerFile(), openStreamerFile(), readBytes(), and skipBytes().

std::vector<FileCatalogItem> edm::StreamerInputFile::streamerNames_
private

keeps track of which file is in use at the moment

Definition at line 73 of file StreamerInputFile.h.

Referenced by compareHeader(), and openNextFile().