CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputFile.cc
Go to the documentation of this file.
2 
9 
12 
13 #include <iomanip>
14 #include <iostream>
15 
16 namespace edm {
17 
20  }
21 
23  std::shared_ptr<EventSkipperByID> eventSkipperByID) :
24  startMsg_(),
25  currentEvMsg_(),
26  headerBuf_(1000*1000),
27  eventBuf_(1000*1000*7),
28  currentFile_(0),
29  streamerNames_(),
30  multiStreams_(false),
31  currentFileName_(),
32  currentFileOpen_(false),
33  eventSkipperByID_(eventSkipperByID),
34  currRun_(0),
35  currProto_(0),
36  newHeader_(false),
37  storage_(),
38  endOfFile_(false) {
39  openStreamerFile(name);
41  }
42 
43  StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
44  std::shared_ptr<EventSkipperByID> eventSkipperByID) :
45  startMsg_(),
46  currentEvMsg_(),
47  headerBuf_(1000*1000),
48  eventBuf_(1000*1000*7),
49  currentFile_(0),
50  streamerNames_(names),
51  multiStreams_(true),
52  currentFileName_(),
53  currentFileOpen_(false),
54  eventSkipperByID_(eventSkipperByID),
55  currRun_(0),
56  currProto_(0),
57  newHeader_(false),
58  endOfFile_(false) {
59  openStreamerFile(names.at(0));
60  ++currentFile_;
62  currRun_ = startMsg_->run();
63  currProto_ = startMsg_->protocolVersion();
64  }
65 
66  void
68 
70 
72  logFileAction(" Initiating request to open file ");
73 
74  IOOffset size = -1;
75  if(StorageFactory::get()->check(name.c_str(), &size)) {
76  try {
77  storage_ =StorageFactory::get()->open(name.c_str(),
79  }
80  catch(cms::Exception& e) {
82  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
83  ex.clearMessage();
84  ex << "Error Opening Streamer Input File: " << name << "\n";
85  throw ex;
86  }
87  } else {
88  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
89  << "Error Opening Streamer Input File, file does not exist: "
90  << name << "\n";
91  }
92  currentFileOpen_ = true;
93  logFileAction(" Successfully opened file ");
94  }
95 
96  void
98  if(currentFileOpen_ && storage_) {
99  storage_->close();
100  logFileAction(" Closed file ");
101  }
102  currentFileOpen_ = false;
103  }
104 
106  IOSize n = 0;
107  try {
108  n = storage_->read(buf, nBytes);
109  }
110  catch(cms::Exception& ce) {
111  Exception ex(errors::FileReadError, "", ce);
112  ex.addContext("Calling StreamerInputFile::readBytes()");
113  throw ex;
114  }
115  return n;
116  }
117 
119  IOOffset n = 0;
120  try {
121  // We wish to return the number of bytes skipped, not the final offset.
122  n = storage_->position(0, Storage::CURRENT);
123  n = storage_->position(nBytes, Storage::CURRENT) - n;
124  }
125  catch(cms::Exception& ce) {
126  Exception ex(errors::FileReadError, "", ce);
127  ex.addContext("Calling StreamerInputFile::skipBytes()");
128  throw ex;
129  }
130  return n;
131  }
132 
134  IOSize nWant = sizeof(HeaderView);
135  IOSize nGot = readBytes(&headerBuf_[0], nWant);
136  if(nGot != nWant) {
137  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
138  << "Failed reading streamer file, first read in readStartMessage\n";
139  }
140 
141  HeaderView head(&headerBuf_[0]);
142  uint32 code = head.code();
143  if(code != Header::INIT)
144  {
145  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
146  << "Expecting an init Message at start of file\n";
147  return;
148  }
149 
150  uint32 headerSize = head.size();
151  if(headerBuf_.size() < headerSize) headerBuf_.resize(headerSize);
152 
153  if(headerSize > sizeof(HeaderView)) {
154  nWant = headerSize - sizeof(HeaderView);
155  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
156  if(nGot != nWant) {
157  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
158  << "Failed reading streamer file, second read in readStartMessage\n";
159  }
160  } else {
161  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
162  << "Failed reading streamer file, init header size from data too small\n";
163  }
164 
165  startMsg_ = std::make_shared<InitMsgView>(&headerBuf_[0]); // propagate_const<T> has no reset() function
166  }
167 
169  if(this->readEventMessage()) {
170  return true;
171  }
172  if(multiStreams_) {
173  //Try opening next file
174  if(openNextFile()) {
175  endOfFile_ = false;
176  if(this->readEventMessage()) {
177  return true;
178  }
179  }
180  }
181  return false;
182  }
183 
185 
186  if(currentFile_ <= streamerNames_.size() - 1) {
187  FDEBUG(10) << "Opening file "
188  << streamerNames_.at(currentFile_).c_str() << std::endl;
189 
191 
192  // If start message was already there, then compare the
193  // previous and new headers
194  if(startMsg_) {
195  FDEBUG(10) << "Comparing Header" << std::endl;
196  if(!compareHeader()) {
197  return false;
198  }
199  }
200  ++currentFile_;
201  return true;
202  }
203  return false;
204  }
205 
207 
208  //Get the new header
210 
211  //Values from new Header should match up
212  if(currRun_ != startMsg_->run() ||
213  currProto_ != startMsg_->protocolVersion()) {
214  throw Exception(errors::MismatchedInputFiles,"StreamerInputFile::compareHeader")
215  << "File " << streamerNames_.at(currentFile_)
216  << "\nhas different run number or protocol version than previous\n";
217  return false;
218  }
219  newHeader_ = true;
220  return true;
221  }
222 
223 
225  if(endOfFile_) return 0;
226 
227  bool eventRead = false;
228  while(!eventRead) {
229 
230  IOSize nWant = sizeof(EventHeader);
231  IOSize nGot = readBytes(&eventBuf_[0], nWant);
232  if(nGot == 0) {
233  // no more data available
234  endOfFile_ = true;
235  return 0;
236  }
237  if(nGot != nWant) {
238  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
239  << "Failed reading streamer file, first read in readEventMessage\n"
240  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
241  }
242  HeaderView head(&eventBuf_[0]);
243  uint32 code = head.code();
244 
245  // If it is not an event then something is wrong.
246  if(code != Header::EVENT) {
247  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
248  << "Failed reading streamer file, unknown code in event header\n"
249  << "code = " << code << "\n";
250  }
251  uint32 eventSize = head.size();
252  if(eventSize <= sizeof(EventHeader)) {
253  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
254  << "Failed reading streamer file, event header size from data too small\n";
255  }
256  eventRead = true;
257  if(eventSkipperByID_) {
258  EventHeader *evh = (EventHeader *)(&eventBuf_[0]);
259  if(eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
260  eventRead = false;
261  }
262  }
263  nWant = eventSize - sizeof(EventHeader);
264  if(eventRead) {
265  if(eventBuf_.size() < eventSize) eventBuf_.resize(eventSize);
266  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
267  if(nGot != nWant) {
268  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
269  << "Failed reading streamer file, second read in readEventMessage\n"
270  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
271  }
272  } else {
273  nGot = skipBytes(nWant);
274  if(nGot != nWant) {
275  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
276  << "Failed reading streamer file, skip event in readEventMessage\n"
277  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
278  }
279  }
280  }
281  currentEvMsg_ = std::make_shared<EventMsgView>((void*)&eventBuf_[0]); // propagate_const<T> has no reset() function
282  return 1;
283  }
284 
286  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
287  FlushMessageLog();
288  }
289 }
uint64 convert64(char_uint64 v)
Definition: MsgTools.h:21
std::vector< char > eventBuf_
static const HistoName names[]
void FlushMessageLog()
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:67
uint32 code() const
Definition: MsgHeader.h:34
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void closeStreamerFile()
Needs to be public because of forking.
static const StorageFactory * get(void)
IOOffset skipBytes(IOSize nBytes)
std::vector< std::string > streamerNames_
void clearMessage()
Definition: Exception.cc:215
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:65
char_uint64 event_
Definition: EventMessage.h:66
uint32 size() const
Definition: MsgHeader.h:35
void logFileAction(char const *msg)
IOSize readBytes(char *buf, IOSize nBytes)
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:30
std::vector< char > headerBuf_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
size_t IOSize
Definition: IOTypes.h:14
volatile std::atomic< bool > shutdown_flag false
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
StreamerInputFile(std::string const &name, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
void openStreamerFile(std::string const &name)
tuple size
Write out results.
static void check(T const &p, std::string const &id, SelectedProducts const &iProducts)
edm::propagate_const< std::unique_ptr< Storage > > storage_
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const