CMS 3D CMS Logo

StreamerInputFile.cc
Go to the documentation of this file.
2 
10 
13 
14 #include <iomanip>
15 #include <iostream>
16 
17 namespace edm {
18 
20 
22  std::string const& LFN,
23  std::shared_ptr<EventSkipperByID> eventSkipperByID)
24  : startMsg_(),
25  currentEvMsg_(),
26  headerBuf_(1000 * 1000),
27  eventBuf_(1000 * 1000 * 7),
28  currentFile_(0),
29  streamerNames_(),
30  multiStreams_(false),
31  currentFileName_(),
32  currentFileOpen_(false),
33  eventSkipperByID_(eventSkipperByID),
34  currRun_(0),
35  currProto_(0),
36  newHeader_(false),
37  storage_(),
38  endOfFile_(false) {
39  openStreamerFile(name, LFN);
41  }
42 
43  StreamerInputFile::StreamerInputFile(std::string const& name, std::shared_ptr<EventSkipperByID> eventSkipperByID)
44  : StreamerInputFile(name, name, eventSkipperByID) {}
45 
46  StreamerInputFile::StreamerInputFile(std::vector<FileCatalogItem> const& names,
47  std::shared_ptr<EventSkipperByID> eventSkipperByID)
48  : startMsg_(),
49  currentEvMsg_(),
50  headerBuf_(1000 * 1000),
51  eventBuf_(1000 * 1000 * 7),
52  currentFile_(0),
53  streamerNames_(names),
54  multiStreams_(true),
55  currentFileName_(),
56  currentFileOpen_(false),
57  eventSkipperByID_(eventSkipperByID),
58  currRun_(0),
59  currProto_(0),
60  newHeader_(false),
61  endOfFile_(false) {
62  openStreamerFile(names.at(0).fileName(0), names.at(0).logicalFileName());
63  ++currentFile_;
65  currRun_ = startMsg_->run();
66  currProto_ = startMsg_->protocolVersion();
67  }
68 
71 
73 
74  // Check if the logical file name was found.
75  if (currentFileName_.empty()) {
76  // LFN not found in catalog.
77  throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
78  << "Logical file name '" << LFN << "' was not found in the file catalog.\n"
79  << "If you wanted a local file, you forgot the 'file:' prefix\n"
80  << "before the file name in your configuration file.\n";
81  return;
82  }
83 
84  logFileAction(" Initiating request to open file ");
85 
86  IOOffset size = -1;
87  if (StorageFactory::get()->check(name, &size)) {
88  try {
90  } catch (cms::Exception& e) {
92  ex.addContext("Calling StreamerInputFile::openStreamerFile()");
93  ex.clearMessage();
94  ex << "Error Opening Streamer Input File: " << name << "\n";
95  throw ex;
96  }
97  } else {
98  throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
99  << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
100  }
101  currentFileOpen_ = true;
102  logFileAction(" Successfully opened file ");
103  }
104 
106  if (currentFileOpen_ && storage_) {
107  storage_->close();
108  logFileAction(" Closed file ");
109  }
110  currentFileOpen_ = false;
111  }
112 
114  IOSize n = 0;
115  try {
116  n = storage_->read(buf, nBytes);
117  } catch (cms::Exception& ce) {
118  Exception ex(errors::FileReadError, "", ce);
119  ex.addContext("Calling StreamerInputFile::readBytes()");
120  throw ex;
121  }
122  return n;
123  }
124 
126  IOOffset n = 0;
127  try {
128  // We wish to return the number of bytes skipped, not the final offset.
129  n = storage_->position(0, Storage::CURRENT);
130  n = storage_->position(nBytes, Storage::CURRENT) - n;
131  } catch (cms::Exception& ce) {
132  Exception ex(errors::FileReadError, "", ce);
133  ex.addContext("Calling StreamerInputFile::skipBytes()");
134  throw ex;
135  }
136  return n;
137  }
138 
140  IOSize nWant = sizeof(HeaderView);
141  IOSize nGot = readBytes(&headerBuf_[0], nWant);
142  if (nGot != nWant) {
143  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
144  << "Failed reading streamer file, first read in readStartMessage\n";
145  }
146 
147  HeaderView head(&headerBuf_[0]);
148  uint32 code = head.code();
149  if (code != Header::INIT)
150  {
151  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
152  << "Expecting an init Message at start of file\n";
153  return;
154  }
155 
156  uint32 headerSize = head.size();
157  if (headerBuf_.size() < headerSize)
158  headerBuf_.resize(headerSize);
159 
160  if (headerSize > sizeof(HeaderView)) {
161  nWant = headerSize - sizeof(HeaderView);
162  nGot = readBytes(&headerBuf_[sizeof(HeaderView)], nWant);
163  if (nGot != nWant) {
164  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
165  << "Failed reading streamer file, second read in readStartMessage\n";
166  }
167  } else {
168  throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
169  << "Failed reading streamer file, init header size from data too small\n";
170  }
171 
172  startMsg_ = std::make_shared<InitMsgView>(&headerBuf_[0]); // propagate_const<T> has no reset() function
173  }
174 
176  if (this->readEventMessage()) {
177  return Next::kEvent;
178  }
179  if (multiStreams_) {
180  //Try opening next file
181  if (currentFile_ <= streamerNames_.size() - 1) {
182  newHeader_ = true;
183  return Next::kFile;
184  }
185  }
186  return Next::kStop;
187  }
188 
190  if (currentFile_ <= streamerNames_.size() - 1) {
191  FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileNames()[0].c_str() << std::endl;
192 
193  openStreamerFile(streamerNames_.at(currentFile_).fileNames()[0],
194  streamerNames_.at(currentFile_).logicalFileName());
195 
196  // If start message was already there, then compare the
197  // previous and new headers
198  if (startMsg_) {
199  FDEBUG(10) << "Comparing Header" << std::endl;
200  compareHeader();
201  }
202  ++currentFile_;
203  endOfFile_ = false;
204  return true;
205  }
206  return false;
207  }
208 
210  //Get the new header
212 
213  //Values from new Header should match up
214  if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
215  throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
216  << "File " << streamerNames_.at(currentFile_).fileNames()[0]
217  << "\nhas different run number or protocol version than previous\n";
218  return false;
219  }
220  return true;
221  }
222 
224  if (endOfFile_)
225  return 0;
226 
227  bool eventRead = false;
228  while (!eventRead) {
229  IOSize nWant = sizeof(EventHeader);
230  IOSize nGot = readBytes(&eventBuf_[0], nWant);
231  if (nGot == 0) {
232  // no more data available
233  endOfFile_ = true;
234  return 0;
235  }
236  if (nGot != nWant) {
237  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
238  << "Failed reading streamer file, first read in readEventMessage\n"
239  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
240  }
241  HeaderView head(&eventBuf_[0]);
242  uint32 code = head.code();
243 
244  // If it is not an event then something is wrong.
245  if (code != Header::EVENT) {
246  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
247  << "Failed reading streamer file, unknown code in event header\n"
248  << "code = " << code << "\n";
249  }
250  uint32 eventSize = head.size();
251  if (eventSize <= sizeof(EventHeader)) {
252  throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
253  << "Failed reading streamer file, event header size from data too small\n";
254  }
255  eventRead = true;
256  if (eventSkipperByID_) {
257  EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
258  if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
259  eventRead = false;
260  }
261  }
262  nWant = eventSize - sizeof(EventHeader);
263  if (eventRead) {
264  if (eventBuf_.size() < eventSize)
265  eventBuf_.resize(eventSize);
266  nGot = readBytes(&eventBuf_[sizeof(EventHeader)], nWant);
267  if (nGot != nWant) {
268  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
269  << "Failed reading streamer file, second read in readEventMessage\n"
270  << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
271  }
272  } else {
273  nGot = skipBytes(nWant);
274  if (nGot != nWant) {
275  throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
276  << "Failed reading streamer file, skip event in readEventMessage\n"
277  << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
278  }
279  }
280  }
281  currentEvMsg_ = std::make_shared<EventMsgView>((void*)&eventBuf_[0]); // propagate_const<T> has no reset() function
282  return 1;
283  }
284 
286  LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
287  FlushMessageLog();
288  }
289 } // namespace edm
edm::StreamerInputFile::Next
Next
Definition: StreamerInputFile.h:34
edm::LogAbsolute
Definition: MessageLogger.h:469
edm::errors::MismatchedInputFiles
Definition: EDMException.h:52
MessageLogger.h
edm::StreamerInputFile::newHeader_
bool newHeader_
Definition: StreamerInputFile.h:83
funct::false
false
Definition: Factorize.h:34
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::StreamerInputFile::currProto_
uint32 currProto_
Definition: StreamerInputFile.h:81
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::TimeOfDay
Definition: TimeOfDay.h:9
edm::StreamerInputFile::openNextFile
bool openNextFile()
Definition: StreamerInputFile.cc:189
edm
HLT enums.
Definition: AlignableModifier.h:19
StorageFactory::open
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
Definition: StorageFactory.cc:122
edm::StreamerInputFile::readEventMessage
int readEventMessage()
Definition: StreamerInputFile.cc:223
EventHeader
Definition: EventHeader.h:18
mps_check.msg
tuple msg
Definition: mps_check.py:285
edm::StreamerInputFile::Next::kEvent
InputFileCatalog.h
edm::StreamerInputFile::skipBytes
IOOffset skipBytes(IOSize nBytes)
Definition: StreamerInputFile.cc:125
edm::StreamerInputFile::currentFileName_
std::string currentFileName_
Definition: StreamerInputFile.h:75
edm::StreamerInputFile::currentEvMsg_
edm::propagate_const< std::shared_ptr< EventMsgView > > currentEvMsg_
Definition: StreamerInputFile.h:67
Header::INIT
Definition: MsgHeader.h:15
edm::StreamerInputFile::streamerNames_
std::vector< FileCatalogItem > streamerNames_
Definition: StreamerInputFile.h:73
uint32
unsigned int uint32
Definition: MsgTools.h:13
edm::errors::FileOpenError
Definition: EDMException.h:49
EventSkipperByID.h
edm::StreamerInputFile::~StreamerInputFile
~StreamerInputFile()
Definition: StreamerInputFile.cc:19
edm::Exception
Definition: EDMException.h:77
edm::StreamerInputFile
Definition: StreamerInputFile.h:19
EDMException.h
EventHeader::lumi_
char_uint32 lumi_
Definition: EventMessage.h:66
Header::EVENT
Definition: MsgHeader.h:16
names
const std::string names[nVars_]
Definition: PhotonIDValueMapProducer.cc:122
IOFlags.h
StorageFactory::get
static const StorageFactory * get(void)
Definition: StorageFactory.cc:26
edm::StreamerInputFile::storage_
edm::propagate_const< std::unique_ptr< Storage > > storage_
Definition: StreamerInputFile.h:85
edm::StreamerInputFile::startMsg_
edm::propagate_const< std::shared_ptr< InitMsgView > > startMsg_
Definition: StreamerInputFile.h:66
edm::StreamerInputFile::compareHeader
bool compareHeader()
Definition: StreamerInputFile.cc:209
IOFlags::OpenRead
Definition: IOFlags.h:7
convert32
uint32 convert32(char_uint32 v)
Definition: MsgTools.h:28
StreamerInputFile.h
HeaderView
Definition: MsgHeader.h:35
edm::StreamerInputFile::closeStreamerFile
void closeStreamerFile()
Definition: StreamerInputFile.cc:105
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
funct::true
true
Definition: Factorize.h:173
edm::FlushMessageLog
void FlushMessageLog()
Definition: MessageLogger.cc:94
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
IOOffset
int64_t IOOffset
Definition: IOTypes.h:19
EventHeader::run_
char_uint32 run_
Definition: EventMessage.h:64
StorageFactory.h
HeaderView::code
uint32 code() const
Definition: MsgHeader.h:43
edm::StreamerInputFile::headerBuf_
std::vector< char > headerBuf_
Definition: StreamerInputFile.h:69
convert64
uint64 convert64(char_uint64 v)
Definition: MsgTools.h:20
edm::StreamerInputFile::readStartMessage
void readStartMessage()
Definition: StreamerInputFile.cc:139
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
edm::StreamerInputFile::eventSkipperByID_
edm::propagate_const< std::shared_ptr< EventSkipperByID > > eventSkipperByID_
Definition: StreamerInputFile.h:78
edm::StreamerInputFile::Next::kStop
EventHeader::event_
char_uint64 event_
Definition: EventMessage.h:65
edm::StreamerInputFile::currRun_
uint32 currRun_
Definition: StreamerInputFile.h:80
edm::check
static void check(T const &p, std::string const &id, SelectedProducts const &iProducts)
Definition: GetProductCheckerOutputModule.cc:80
HeaderView::size
uint32 size() const
Definition: MsgHeader.h:44
edm::StreamerInputFile::Next::kFile
Exception
Definition: hltDiff.cc:246
edm::StreamerInputFile::openStreamerFile
void openStreamerFile(std::string const &name, std::string const &LFN)
Definition: StreamerInputFile.cc:69
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
edm::StreamerInputFile::next
Next next()
Definition: StreamerInputFile.cc:175
edm::StreamerInputFile::multiStreams_
bool multiStreams_
Definition: StreamerInputFile.h:74
edm::StreamerInputFile::endOfFile_
bool endOfFile_
Definition: StreamerInputFile.h:87
Exception.h
edm::StreamerInputFile::currentFile_
unsigned int currentFile_
Definition: StreamerInputFile.h:72
Storage::CURRENT
Definition: Storage.h:22
edm::StreamerInputFile::logFileAction
void logFileAction(char const *msg)
Definition: StreamerInputFile.cc:285
DebugMacros.h
cms::Exception
Definition: Exception.h:70
edm::StreamerInputFile::StreamerInputFile
StreamerInputFile(std::string const &name, std::string const &LFN, std::shared_ptr< EventSkipperByID > eventSkipperByID=std::shared_ptr< EventSkipperByID >())
Definition: StreamerInputFile.cc:21
edm::StreamerInputFile::readBytes
IOSize readBytes(char *buf, IOSize nBytes)
Definition: StreamerInputFile.cc:113
edm::StreamerInputFile::eventBuf_
std::vector< char > eventBuf_
Definition: StreamerInputFile.h:70
edm::StreamerInputFile::currentFileOpen_
bool currentFileOpen_
Definition: StreamerInputFile.h:76
TimeOfDay.h
IOSize
size_t IOSize
Definition: IOTypes.h:14
cms::Exception::clearMessage
void clearMessage()
Definition: Exception.cc:159
edm::errors::FileReadError
Definition: EDMException.h:50
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37