Go to the documentation of this file.00001 #include "IOPool/Streamer/interface/MsgTools.h"
00002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
00003 #include "IOPool/Streamer/src/StreamerFileReader.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "FWCore/Catalog/interface/InputFileCatalog.h"
00007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00010 #include "FWCore/Sources/interface/EventSkipperByID.h"
00011
00012 namespace edm {
00013
00014 StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc) :
00015 StreamerInputSource(pset, desc),
00016 streamerNames_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames")),
00017 streamReader_(),
00018 eventSkipperByID_(EventSkipperByID::create(pset).release()),
00019 initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")) {
00020 InputFileCatalog catalog(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"), pset.getUntrackedParameter<std::string>("overrideCatalog"));
00021 streamerNames_ = catalog.fileNames();
00022 reset_();
00023
00024 }
00025
00026 StreamerFileReader::~StreamerFileReader() {
00027 }
00028
00029 void
00030 StreamerFileReader::reset_() {
00031 if (streamerNames_.size() > 1) {
00032 streamReader_ = std::unique_ptr<StreamerInputFile>(new StreamerInputFile(streamerNames_, eventSkipperByID_));
00033 } else if (streamerNames_.size() == 1) {
00034 streamReader_ = std::unique_ptr<StreamerInputFile>(new StreamerInputFile(streamerNames_.at(0), eventSkipperByID_));
00035 } else {
00036 throw Exception(errors::FileReadError, "StreamerFileReader::StreamerFileReader")
00037 << "No fileNames were specified\n";
00038 }
00039 InitMsgView const* header = getHeader();
00040 deserializeAndMergeWithRegistry(*header, false);
00041 if(initialNumberOfEventsToSkip_) {
00042 skip(initialNumberOfEventsToSkip_);
00043 }
00044 }
00045
00046
00047 bool StreamerFileReader::checkNextEvent() {
00048 EventMsgView const* eview = getNextEvent();
00049
00050 if (newHeader()) {
00051
00052
00053
00054 InitMsgView const* header = getHeader();
00055 deserializeAndMergeWithRegistry(*header, true);
00056 }
00057 if (eview == nullptr) {
00058 return false;
00059 }
00060 deserializeEvent(*eview);
00061 return true;
00062 }
00063
00064 void
00065 StreamerFileReader::skip(int toSkip) {
00066 for(int i = 0; i != toSkip; ++i) {
00067 EventMsgView const* evMsg = getNextEvent();
00068 if(evMsg == nullptr) {
00069 return;
00070 }
00071
00072 if(eventSkipperByID_ && eventSkipperByID_->skipIt(evMsg->run(), evMsg->lumi(), evMsg->event())) {
00073 --i;
00074 }
00075 }
00076 }
00077
00078 void
00079 StreamerFileReader::closeFile_() {
00080 if(streamReader_.get() != nullptr) streamReader_->closeStreamerFile();
00081 }
00082
00083 bool
00084 StreamerFileReader::newHeader() {
00085 return streamReader_->newHeader();
00086 }
00087
00088 InitMsgView const*
00089 StreamerFileReader::getHeader() {
00090
00091 InitMsgView const* header = streamReader_->startMessage();
00092
00093 if(header->code() != Header::INIT) {
00094 throw Exception(errors::FileReadError, "StreamerFileReader::readHeader")
00095 << "received wrong message type: expected INIT, got "
00096 << header->code() << "\n";
00097 }
00098 return header;
00099 }
00100
00101 EventMsgView const*
00102 StreamerFileReader::getNextEvent() {
00103 if (!streamReader_->next()) {
00104 return nullptr;
00105 }
00106 return streamReader_->currentRecord();
00107 }
00108
00109 void
00110 StreamerFileReader::fillDescriptions(ConfigurationDescriptions& descriptions) {
00111 ParameterSetDescription desc;
00112 desc.setComment("Reads events from streamer files.");
00113 desc.addUntracked<std::vector<std::string> >("fileNames")
00114 ->setComment("Names of files to be processed.");
00115 desc.addUntracked<unsigned int>("skipEvents", 0U)
00116 ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
00117 desc.addUntracked<std::string>("overrideCatalog", std::string());
00118
00119 desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
00120 StreamerInputSource::fillDescription(desc);
00121 EventSkipperByID::fillDescription(desc);
00122 descriptions.add("source", desc);
00123 }
00124 }
00125