CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputFile.cc
Go to the documentation of this file.
2 
9 
12 
13 #include <iomanip>
14 #include <iostream>
15 
16 namespace edm {
17 
19  if(storage_) {
20  storage_->close();
21  if(currentFileOpen_) logFileAction(" Closed file ");
22  }
23  }
24 
26  int* numberOfEventsToSkip,
27  boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
28  startMsg_(),
29  currentEvMsg_(),
30  headerBuf_(1000*1000),
31  eventBuf_(1000*1000*7),
32  currentFile_(0),
33  streamerNames_(),
34  multiStreams_(false),
35  currentFileName_(),
36  currentFileOpen_(false),
37  eventSkipperByID_(eventSkipperByID),
38  numberOfEventsToSkip_(numberOfEventsToSkip),
39  currRun_(0),
40  currProto_(0),
41  newHeader_(false),
42  storage_(),
43  endOfFile_(false) {
44  openStreamerFile(name);
46  }
47 
48  StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
49  int* numberOfEventsToSkip,
50  boost::shared_ptr<EventSkipperByID> eventSkipperByID) :
51  startMsg_(),
52  currentEvMsg_(),
53  headerBuf_(1000*1000),
54  eventBuf_(1000*1000*7),
55  currentFile_(0),
56  streamerNames_(names),
57  multiStreams_(true),
58  currentFileName_(),
59  currentFileOpen_(false),
60  eventSkipperByID_(eventSkipperByID),
61  numberOfEventsToSkip_(numberOfEventsToSkip),
62  currRun_(0),
63  currProto_(0),
64  newHeader_(false),
65  endOfFile_(false) {
66  openStreamerFile(names.at(0));
67  ++currentFile_;
69  currRun_ = startMsg_->run();
70  currProto_ = startMsg_->protocolVersion();
71  }
72 
73  void
75 
76  if(storage_) {
77  storage_->close();
78  if(currentFileOpen_) logFileAction(" Closed file ");
79  }
80 
82  currentFileOpen_ = false;
83  logFileAction(" Initiating request to open file ");
84 
85  IOOffset size = -1;
86  if(StorageFactory::get()->check(name.c_str(), &size)) {
87  try {
88  storage_.reset(StorageFactory::get()->open(name.c_str(),
90  }
91  catch(cms::Exception& e) {
93  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
94  ex.clearMessage();
95  ex << "Error Opening Streamer Input File: " << name << "\n";
96  throw ex;
97  }
98  } else {
99  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
100  << "Error Opening Streamer Input File, file does not exist: "
101  << name << "\n";
102  }
103  currentFileOpen_ = true;
104  logFileAction(" Successfully opened file ");
105  }
106 
108  IOSize n = 0;
109  try {
110  n = storage_->read(buf, nBytes);
111  }
112  catch(cms::Exception& ce) {
113  Exception ex(errors::FileReadError, "", ce);
114  ex.addContext("Calling StreamerInputFile::readBytes()");
115  throw ex;
116  }
117  return n;
118  }
119 
121  IOOffset n = 0;
122  try {
123  // We wish to return the number of bytes skipped, not the final offset.
124  n = storage_->position(0, Storage::CURRENT);
125  n = storage_->position(nBytes, Storage::CURRENT) - n;
126  }
127  catch(cms::Exception& ce) {
128  Exception ex(errors::FileReadError, "", ce);
129  ex.addContext("Calling StreamerInputFile::skipBytes()");
130  throw ex;
131  }
132  return n;
133  }
134 
136  IOSize nWant = sizeof(HeaderView);
137  IOSize nGot = readBytes(&headerBuf_[0], nWant);
138  if(nGot != nWant) {
139  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
140  << "Failed reading streamer file, first read in readStartMessage\n";
141  }
142 
143  HeaderView head(&headerBuf_[0]);
144  uint32 code = head.code();
145  if(code != Header::INIT)
146  {
147  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
148  << "Expecting an init Message at start of file\n";
149  return;
150  }
151 
152  uint32 headerSize = head.size();
153  if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
154 
155  if(headerSize > sizeof(HeaderView)) {
156  nWant = headerSize - sizeof(HeaderView);
157  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
158  if(nGot != nWant) {
159  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
160  << "Failed reading streamer file, second read in readStartMessage\n";
161  }
162  } else {
163  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
164  << "Failed reading streamer file, init header size from data too small\n";
165  }
166 
167  startMsg_.reset(new InitMsgView(&headerBuf_[0]));
168  }
169 
171  if(this->readEventMessage()) {
172  return true;
173  }
174  if(multiStreams_) {
175  //Try opening next file
176  if(openNextFile()) {
177  endOfFile_ = false;
178  if(this->readEventMessage()) {
179  return true;
180  }
181  }
182  }
183  return false;
184  }
185 
187 
188  if(currentFile_ <= streamerNames_.size() - 1) {
189  FDEBUG(10) << "Opening file "
190  << streamerNames_.at(currentFile_).c_str() << std::endl;
191 
193 
194  // If start message was already there, then compare the
195  // previous and new headers
196  if(startMsg_) {
197  FDEBUG(10) << "Comparing Header" << std::endl;
198  if(!compareHeader()) {
199  return false;
200  }
201  }
202  ++currentFile_;
203  return true;
204  }
205  return false;
206  }
207 
209 
210  //Get the new header
212 
213  //Values from new Header should match up
214  if(currRun_ != startMsg_->run() ||
215  currProto_ != startMsg_->protocolVersion()) {
216  throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
217  << "File " << streamerNames_.at(currentFile_)
218  << "\nhas different run number or protocol version than previous\n";
219  return false;
220  }
221  newHeader_ = true;
222  return true;
223  }
224 
225 
227  if(endOfFile_) return 0;
228 
229  bool eventRead = false;
230  while(!eventRead) {
231 
232  IOSize nWant = sizeof(EventHeader);
233  IOSize nGot = readBytes(&eventBuf_[0], nWant);
234  if(nGot != nWant) {
235  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
236  << "Failed reading streamer file, first read in readEventMessage\n"
237  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
238  }
239  HeaderView head(&eventBuf_[0]);
240  uint32 code = head.code();
241 
242  // When we get the EOF record we know we have read all events
243  // normally and are at the end, return 0 to indicate this
244  if(code == Header::EOFRECORD) {
245  endOfFile_ = true;
246  return 0;
247  }
248  // If it is not an event nor EOFRECORD then something is wrong.
249  if(code != Header::EVENT) {
250  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
251  << "Failed reading streamer file, unknown code in event header\n"
252  << "code = " << code << "\n";
253  }
254  uint32 eventSize = head.size();
255  if(eventSize <= sizeof(EventHeader)) {
256  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
257  << "Failed reading streamer file, event header size from data too small\n";
258  }
259  eventRead = true;
260  if(eventSkipperByID_) {
261  EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
262  if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert32(evh->event_))) {
263  eventRead = false;
264  }
265  }
266  if(eventRead && numberOfEventsToSkip_ && *numberOfEventsToSkip_ > 0) {
267  eventRead = false;
268  --(*numberOfEventsToSkip_);
269  }
270  nWant = eventSize - sizeof(EventHeader);
271  if(eventRead) {
272  if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
273  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
274  if(nGot != nWant) {
275  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
276  << "Failed reading streamer file, second read in readEventMessage\n"
277  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
278  }
279  } else {
280  nGot = skipBytes(nWant);
281  if(nGot != nWant) {
282  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
283  << "Failed reading streamer file, skip event in readEventMessage\n"
284  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
285  }
286  }
287  }
288  currentEvMsg_.reset(new EventMsgView((void*)&eventBuf_[0]));
289  return 1;
290  }
291 
293  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
294  FlushMessageLog();
295  }
296 }
std::vector< char > eventBuf_
char_uint32 event_
Definition: EventMessage.h:62
static const HistoName names[]
void FlushMessageLog()
boost::shared_ptr< EventSkipperByID > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:63
uint32 code() const
Definition: MsgHeader.h:34
#define FDEBUG(lev)
Definition: DebugMacros.h:18
static StorageFactory * get(void)
boost::shared_ptr< EventMsgView > currentEvMsg_
IOOffset skipBytes(IOSize nBytes)
std::vector< std::string > streamerNames_
void clearMessage()
Definition: Exception.cc:215
boost::shared_ptr< Storage > storage_
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:61
static void check(Principal const &p, std::string const &id)
boost::shared_ptr< InitMsgView > startMsg_
uint32 size() const
Definition: MsgHeader.h:35
void logFileAction(char const *msg)
IOSize readBytes(char *buf, IOSize nBytes)
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:30
std::vector< char > headerBuf_
size_t IOSize
Definition: IOTypes.h:14
void openStreamerFile(std::string const &name)
tuple size
Write out results.
StreamerInputFile(std::string const &name, int *numberOfEventsToSkip=0, boost::shared_ptr< EventSkipperByID > eventSkipperByID=boost::shared_ptr< EventSkipperByID >())