CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
StreamerInputFile.cc
Go to the documentation of this file.
2 
10 
13 
14 #include <iomanip>
15 #include <iostream>
16 
17 namespace edm {
18 
20 
22  std::string const& LFN,
23  std::shared_ptr<EventSkipperByID> eventSkipperByID)
24  : startMsg_(),
25  currentEvMsg_(),
26  headerBuf_(1000 * 1000),
27  eventBuf_(1000 * 1000 * 7),
28  currentFile_(0),
29  streamerNames_(),
30  multiStreams_(false),
31  currentFileName_(),
32  currentFileOpen_(false),
33  eventSkipperByID_(eventSkipperByID),
34  currRun_(0),
35  currProto_(0),
36  newHeader_(false),
37  storage_(),
38  endOfFile_(false) {
39  openStreamerFile(name, LFN);
41  }
42 
43  StreamerInputFile::StreamerInputFile(std::string const& name, std::shared_ptr<EventSkipperByID> eventSkipperByID)
44  : StreamerInputFile(name, name, eventSkipperByID) {}
45 
46  StreamerInputFile::StreamerInputFile(std::vector<FileCatalogItem> const& names,
47  std::shared_ptr<EventSkipperByID> eventSkipperByID)
48  : startMsg_(),
49  currentEvMsg_(),
50  headerBuf_(1000 * 1000),
51  eventBuf_(1000 * 1000 * 7),
52  currentFile_(0),
53  streamerNames_(names),
54  multiStreams_(true),
55  currentFileName_(),
56  currentFileOpen_(false),
57  eventSkipperByID_(eventSkipperByID),
58  currRun_(0),
59  currProto_(0),
60  newHeader_(false),
61  endOfFile_(false) {
62  openStreamerFile(names.at(0).fileName(0), names.at(0).logicalFileName());
63  ++currentFile_;
65  currRun_ = startMsg_->run();
66  currProto_ = startMsg_->protocolVersion();
67  }
68 
71 
73 
74  // Check if the logical file name was found.
75  if (currentFileName_.empty()) {
76  // LFN not found in catalog.
77  throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
78  << "Logical file name '" << LFN << "' was not found in the file catalog.\n"
79  << "If you wanted a local file, you forgot the 'file:' prefix\n"
80  << "before the file name in your configuration file.\n";
81  return;
82  }
83 
84  logFileAction(" Initiating request to open file ");
85 
86  using namespace edm::storage;
87  IOOffset size = -1;
88  if (StorageFactory::get()->check(name, &size)) {
89  try {
91  } catch (cms::Exception& e) {
93  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
94  ex.clearMessage();
95  ex << "Error Opening Streamer Input File: " << name << "\n";
96  throw ex;
97  }
98  } else {
99  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
100  << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
101  }
102  currentFileOpen_ = true;
103  logFileAction(" Successfully opened file ");
104  }
105 
107  if (currentFileOpen_ && storage_) {
108  storage_->close();
109  logFileAction(" Closed file ");
110  }
111  currentFileOpen_ = false;
112  }
113 
115  storage::IOSize n = 0;
116  try {
117  n = storage_->read(buf, nBytes);
118  } catch (cms::Exception& ce) {
119  Exception ex(errors::FileReadError, "", ce);
120  ex.addContext("Calling StreamerInputFile::readBytes()");
121  throw ex;
122  }
123  return n;
124  }
125 
127  storage::IOOffset n = 0;
128  try {
129  // We wish to return the number of bytes skipped, not the final offset.
130  n = storage_->position(0, storage::Storage::CURRENT);
131  n = storage_->position(nBytes, storage::Storage::CURRENT) - n;
132  } catch (cms::Exception& ce) {
133  Exception ex(errors::FileReadError, "", ce);
134  ex.addContext("Calling StreamerInputFile::skipBytes()");
135  throw ex;
136  }
137  return n;
138  }
139 
141  using namespace edm::storage;
142  IOSize nWant = sizeof(HeaderView);
143  IOSize nGot = readBytes(&headerBuf_[0], nWant);
144  if (nGot != nWant) {
145  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
146  << "Failed reading streamer file, first read in readStartMessage\n";
147  }
148 
149  HeaderView head(&headerBuf_[0]);
150  uint32 code = head.code();
151  if (code != Header::INIT)
152  {
153  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
154  << "Expecting an init Message at start of file\n";
155  return;
156  }
157 
158  uint32 headerSize = head.size();
159  if (headerBuf_.size() < headerSize)
160  headerBuf_.resize(headerSize);
161 
162  if (headerSize > sizeof(HeaderView)) {
163  nWant = headerSize - sizeof(HeaderView);
164  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
165  if (nGot != nWant) {
166  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
167  << "Failed reading streamer file, second read in readStartMessage\n";
168  }
169  } else {
170  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
171  << "Failed reading streamer file, init header size from data too small\n";
172  }
173 
174  startMsg_ = std::make_shared<InitMsgView>(&headerBuf_[0]); // propagate_const<T> has no reset() function
175  }
176 
178  if (this->readEventMessage()) {
179  return Next::kEvent;
180  }
181  if (multiStreams_) {
182  //Try opening next file
183  if (currentFile_ <= streamerNames_.size() - 1) {
184  newHeader_ = true;
185  return Next::kFile;
186  }
187  }
188  return Next::kStop;
189  }
190 
192  if (currentFile_ <= streamerNames_.size() - 1) {
193  FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileNames()[0].c_str() << std::endl;
194 
195  openStreamerFile(streamerNames_.at(currentFile_).fileNames()[0],
196  streamerNames_.at(currentFile_).logicalFileName());
197 
198  // If start message was already there, then compare the
199  // previous and new headers
200  if (startMsg_) {
201  FDEBUG(10) << "Comparing Header" << std::endl;
202  compareHeader();
203  }
204  ++currentFile_;
205  endOfFile_ = false;
206  return true;
207  }
208  return false;
209  }
210 
212  //Get the new header
214 
215  //Values from new Header should match up
216  if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
217  throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
218  << "File " << streamerNames_.at(currentFile_).fileNames()[0]
219  << "\nhas different run number or protocol version than previous\n";
220  return false;
221  }
222  return true;
223  }
224 
226  if (endOfFile_)
227  return 0;
228 
229  using namespace edm::storage;
230  bool eventRead = false;
231  while (!eventRead) {
232  IOSize nWant = sizeof(EventHeader);
233  IOSize nGot = readBytes(&eventBuf_[0], nWant);
234  if (nGot == 0) {
235  // no more data available
236  endOfFile_ = true;
237  return 0;
238  }
239  if (nGot != nWant) {
240  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
241  << "Failed reading streamer file, first read in readEventMessage\n"
242  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
243  }
244  HeaderView head(&eventBuf_[0]);
245  uint32 code = head.code();
246 
247  // If it is not an event then something is wrong.
248  if (code != Header::EVENT) {
249  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
250  << "Failed reading streamer file, unknown code in event header\n"
251  << "code = " << code << "\n";
252  }
253  uint32 eventSize = head.size();
254  if (eventSize <= sizeof(EventHeader)) {
255  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
256  << "Failed reading streamer file, event header size from data too small\n";
257  }
258  eventRead = true;
259  if (eventSkipperByID_) {
260  EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
261  if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
262  eventRead = false;
263  }
264  }
265  nWant = eventSize - sizeof(EventHeader);
266  if (eventRead) {
267  if (eventBuf_.size() < eventSize)
268  eventBuf_.resize(eventSize);
269  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
270  if (nGot != nWant) {
271  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
272  << "Failed reading streamer file, second read in readEventMessage\n"
273  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
274  }
275  } else {
276  nGot = skipBytes(nWant);
277  if (nGot != nWant) {
278  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
279  << "Failed reading streamer file, skip event in readEventMessage\n"
280  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
281  }
282  }
283  }
284  currentEvMsg_ = std::make_shared<EventMsgView>((void*)&eventBuf_[0]); // propagate_const<T> has no reset() function
285  return 1;
286  }
287 
289  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
290  FlushMessageLog();
291  }
292 } // namespace edm
int64_t IOOffset
Definition: IOTypes.h:20
uint64 convert64(char_uint64 v)
Definition: MsgTools.h:20
std::vector< char > eventBuf_
void FlushMessageLog()
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
char_uint32 lumi_
Definition: EventMessage.h:66
uint32 code() const
Definition: MsgHeader.h:43
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamerInputFile(std::string const &name, std::string const &LFN, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
storage::IOOffset skipBytes(storage::IOSize nBytes)
storage::IOSize readBytes(char *buf, storage::IOSize nBytes)
std::vector< std::string > names
void clearMessage()
Definition: Exception.cc:159
void openStreamerFile(std::string const &name, std::string const &LFN)
unsigned int uint32
Definition: MsgTools.h:13
char_uint32 run_
Definition: EventMessage.h:64
size_t IOSize
Definition: IOTypes.h:15
char_uint64 event_
Definition: EventMessage.h:65
uint32 size() const
Definition: MsgHeader.h:44
void logFileAction(char const *msg)
void addContext(std::string const &context)
Definition: Exception.cc:165
tuple msg
Definition: mps_check.py:285
static void check(T const &p, std::string const &id, SelectedProducts const &iProducts, bool iVerbose)
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:28
std::vector< char > headerBuf_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
std::vector< FileCatalogItem > streamerNames_
Log< level::System, true > LogAbsolute
#define get
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
edm::propagate_const< std::unique_ptr< edm::storage::Storage > > storage_
tuple size
Write out results.