CMS 3D CMS Logo

DQMProtobufReader.cc
Go to the documentation of this file.
1 #include "DQMProtobufReader.h"
2 
14 
15 #include <cstdlib>
16 #include <fcntl.h>
17 #include <filesystem>
18 #include <regex>
19 
20 #include <TBufferFile.h>
21 
22 #include <google/protobuf/io/coded_stream.h>
23 #include <google/protobuf/io/gzip_stream.h>
24 #include <google/protobuf/io/zero_copy_stream_impl.h>
25 
26 using namespace dqmservices;
27 
30  fiterator_(pset),
31  flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
32  flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
33  flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
34  flagLoadFiles_(pset.getUntrackedParameter<bool>("loadFiles")) {
35  produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceDataPath");
36  produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceJsonPath");
37  produces<DQMToken, edm::Transition::BeginRun>("DQMGenerationRecoRun");
38  produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
39 }
40 
43  typedef DQMFileIterator::LumiEntry LumiEntry;
44 
45  // fiterator_.logFileAction("getNextItemType");
46 
47  for (;;) {
49 
50  if (edm::shutdown_flag.load()) {
51  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
52  return InputSource::ItemType::IsStop;
53  }
54 
55  // check for end of run file and force quit
56  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
57  return InputSource::ItemType::IsStop;
58  }
59 
60  // check for end of run and quit if everything has been processed.
61  // this is the clean exit
62  if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
63  return InputSource::ItemType::IsStop;
64  }
65 
66  // skip to the next file if we have no files openned yet
67  if (fiterator_.lumiReady()) {
68  return InputSource::ItemType::IsLumi;
69  }
70 
71  fiterator_.delay();
72  // BUG: for an unknown reason it fails after a certain time if we use
73  // IsSynchronize state
74  //
75  // comment out in order to block at this level
76  // return InputSource::ItemType::IsSynchronize;
77  }
78 
79  // this is unreachable
80 }
81 
82 std::shared_ptr<edm::RunAuxiliary> DQMProtobufReader::readRunAuxiliary_() {
83  // fiterator_.logFileAction("readRunAuxiliary_");
84 
86  return std::shared_ptr<edm::RunAuxiliary>(aux);
87 }
88 
90  // fiterator_.logFileAction("readRun_");
92 
94  std::vector<MonitorElement*> allMEs = store->getAllContents("");
95  for (auto const& ME : allMEs) {
96  ME->Reset();
97  }
98 }
99 
100 std::shared_ptr<edm::LuminosityBlockAuxiliary> DQMProtobufReader::readLuminosityBlockAuxiliary_() {
101  // fiterator_.logFileAction("readLuminosityBlockAuxiliary_");
102 
106 
107  return std::shared_ptr<edm::LuminosityBlockAuxiliary>(aux);
108 }
109 
111  // fiterator_.logFileAction("readLuminosityBlock_");
112 
114  jr->reportInputLumiSection(lbCache.id().run(), lbCache.id().luminosityBlock());
116 }
117 
120 
121  // clear the old lumi histograms
122  std::vector<MonitorElement*> allMEs = store->getAllContents("");
123  for (auto const& ME : allMEs) {
124  // We do not want to reset Run Products here!
125  if (ME->getLumiFlag()) {
126  ME->Reset();
127  }
128  }
129 
130  // load the new file
133 
134  std::unique_ptr<std::string> path_product(new std::string(path));
135  std::unique_ptr<std::string> json_product(new std::string(jspath));
136 
137  lb.put(std::move(path_product), "sourceDataPath");
138  lb.put(std::move(json_product), "sourceJsonPath");
139 
140  if (flagLoadFiles_) {
141  if (!std::filesystem::exists(path)) {
142  fiterator_.logFileAction("Data file is missing ", path);
143  fiterator_.logLumiState(currentLumi_, "error: data file missing");
144  return;
145  }
146 
147  fiterator_.logFileAction("Initiating request to open file ", path);
148  fiterator_.logFileAction("Successfully opened file ", path);
149  load(&*store, path);
150  fiterator_.logFileAction("Closed file ", path);
151  fiterator_.logLumiState(currentLumi_, "close: ok");
152  } else {
153  fiterator_.logFileAction("Not loading the data file at source level ", path);
154  fiterator_.logLumiState(currentLumi_, "close: not loading");
155  }
156 }
157 
159  using google::protobuf::io::ArrayInputStream;
160  using google::protobuf::io::CodedInputStream;
161  using google::protobuf::io::FileInputStream;
162  using google::protobuf::io::FileOutputStream;
163  using google::protobuf::io::GzipInputStream;
164  using google::protobuf::io::GzipOutputStream;
165 
166  int filedescriptor;
167  if ((filedescriptor = ::open(filename.c_str(), O_RDONLY)) == -1) {
168  edm::LogError("DQMProtobufReader") << "File " << filename << " does not exist.";
169  }
170 
171  dqmstorepb::ROOTFilePB dqmstore_message;
172  FileInputStream fin(filedescriptor);
173  GzipInputStream input(&fin);
174  CodedInputStream input_coded(&input);
175  input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
176  if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
177  edm::LogError("DQMProtobufReader") << "Fatal parsing file '" << filename << "'";
178  }
179 
180  ::close(filedescriptor);
181 
182  for (int i = 0; i < dqmstore_message.histo_size(); ++i) {
183  TObject* obj = nullptr;
184  dqmstorepb::ROOTFilePB::Histo const& h = dqmstore_message.histo(i);
185 
186  size_t slash = h.full_pathname().rfind('/');
187  size_t dirpos = (slash == std::string::npos ? 0 : slash);
188  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
189  std::string objname, dirname;
190  dirname.assign(h.full_pathname(), 0, dirpos);
191  objname.assign(h.full_pathname(), namepos, std::string::npos);
192  TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE);
193  buf.Reset();
194  if (buf.Length() == buf.BufferSize()) {
195  obj = nullptr;
196  } else {
197  buf.InitMap();
198  void* ptr = buf.ReadObjectAny(nullptr);
199  obj = reinterpret_cast<TObject*>(ptr);
200  }
201 
202  if (!obj) {
203  edm::LogError("DQMProtobufReader") << "Error reading element:'" << h.full_pathname();
204  }
205 
206  store->setCurrentFolder(dirname);
207 
208  if (h.flags() & DQMNet::DQM_PROP_LUMI) {
209  store->setScope(MonitorElementData::Scope::LUMI);
210  } else {
211  store->setScope(MonitorElementData::Scope::RUN);
212  }
213 
214  if (obj) {
215  int kind = h.flags() & DQMNet::DQM_PROP_TYPE_MASK;
217  MonitorElement* me = store->bookInt(objname);
218  auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
219  std::regex parseint{"<.*>i=(.*)</.*>"};
220  std::smatch match;
221  bool ok = std::regex_match(expression, match, parseint);
222  if (!ok) {
223  edm::LogError("DQMProtobufReader") << "Malformed object of type INT: '" << expression << "'";
224  continue;
225  }
226  int value = std::atoi(match[1].str().c_str());
227  me->Fill(value);
228  } else if (kind == DQMNet::DQM_PROP_TYPE_REAL) {
229  MonitorElement* me = store->bookFloat(objname);
230  auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
231  std::regex parsefloat{"<.*>f=(.*)</.*>"};
232  std::smatch match;
233  bool ok = std::regex_match(expression, match, parsefloat);
234  if (!ok) {
235  edm::LogError("DQMProtobufReader") << "Malformed object of type REAL: '" << expression << "'";
236  continue;
237  }
238  double value = std::atof(match[1].str().c_str());
239  me->Fill(value);
240  } else if (kind == DQMNet::DQM_PROP_TYPE_STRING) {
241  auto value = static_cast<TObjString*>(obj)->String();
242  store->bookString(objname, value);
243  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1F) {
244  auto value = static_cast<TH1F*>(obj);
245  store->book1D(objname, value);
246  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1S) {
247  auto value = static_cast<TH1S*>(obj);
248  store->book1S(objname, value);
249  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1D) {
250  auto value = static_cast<TH1D*>(obj);
251  store->book1DD(objname, value);
252  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1I) {
253  auto value = static_cast<TH1I*>(obj);
254  store->book1I(objname, value);
255  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2F) {
256  auto value = static_cast<TH2F*>(obj);
257  store->book2D(objname, value);
258  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2S) {
259  auto value = static_cast<TH2S*>(obj);
260  store->book2S(objname, value);
261  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2D) {
262  auto value = static_cast<TH2D*>(obj);
263  store->book2DD(objname, value);
264  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2I) {
265  auto value = static_cast<TH2I*>(obj);
266  store->book2I(objname, value);
267  } else if (kind == DQMNet::DQM_PROP_TYPE_TH3F) {
268  auto value = static_cast<TH3F*>(obj);
269  store->book3D(objname, value);
270  } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF) {
271  auto value = static_cast<TProfile*>(obj);
272  store->bookProfile(objname, value);
273  } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF2D) {
274  auto value = static_cast<TProfile2D*>(obj);
275  store->bookProfile2D(objname, value);
276  } else {
277  edm::LogError("DQMProtobufReader") << "Unknown type: " << kind;
278  }
279  delete obj;
280  }
281  }
282 }
283 
285 
288 
289  desc.setComment("Creates runs and lumis and fills the dqmstore from protocol buffer files.");
291 
292  desc.addUntracked<bool>("skipFirstLumis", false)
293  ->setComment(
294  "Skip (and ignore the minEventsPerLumi parameter) for the files which have been available at the begining of "
295  "the processing. If set to true, the reader will open last available file for processing.");
296 
297  desc.addUntracked<bool>("deleteDatFiles", false)
298  ->setComment("Delete data files after they have been closed, in order to save disk space.");
299 
300  desc.addUntracked<bool>("endOfRunKills", false)
301  ->setComment(
302  "Kill the processing as soon as the end-of-run file appears, even if there are/will be unprocessed "
303  "lumisections.");
304 
305  desc.addUntracked<bool>("loadFiles", true)
306  ->setComment(
307  "Tells the source to load the data files. If set to False, the source will create skeleton lumi "
308  "transitions.");
309 
311  descriptions.add("source", desc);
312 }
313 
316 
edm::InputSource::ItemTypeInfo getNextItemType() override
LuminosityBlockNumber_t luminosityBlock() const
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:34
MonitorElement * bookFloat(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:80
MonitorElement * bookProfile2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, double lowZ, double highZ, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:485
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:44
MonitorElement * book1I(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:186
MonitorElement * book2S(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:263
void readEvent_(edm::EventPrincipal &) override
unsigned int runNumber() const
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:39
void load(DQMStore *store, std::string filename)
void fillLuminosityBlockPrincipal(ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
void logLumiState(const LumiEntry &lumi, const std::string &msg)
ProcessHistoryID const & processHistoryID() const
volatile std::atomic< bool > shutdown_flag
void setCurrentFolder(std::string const &fullpath) override
Definition: DQMStore.h:656
virtual MonitorElementData::Scope setScope(MonitorElementData::Scope newscope)
Definition: DQMStore.cc:50
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:41
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:33
Log< level::Error, false > LogError
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void readRun_(edm::RunPrincipal &rpCache) override
void readLuminosityBlock_(edm::LuminosityBlockPrincipal &lbCache) override
MonitorElement * bookString(TString const &name, TString const &value, FUNC onbooking=NOOP())
Definition: DQMStore.h:87
static std::string const input
Definition: EdmProvDump.cc:50
Definition: ME.h:11
MonitorElement * book1DD(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:155
std::shared_ptr< edm::RunAuxiliary > readRunAuxiliary_() override
#define DEFINE_FWK_INPUT_SOURCE(type)
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:30
MonitorElement * bookProfile(TString const &name, TString const &title, int nchX, double lowX, double highX, int, double lowY, double highY, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:408
void put(std::unique_ptr< PROD > product)
Put a new product.
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
Definition: DQMStore.cc:641
void reportInputLumiSection(unsigned int run, unsigned int lumiSectId)
Definition: JobReport.cc:501
std::shared_ptr< edm::LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
void beginLuminosityBlock(edm::LuminosityBlock &lb) override
MonitorElement * book1S(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:133
Definition: value.py:1
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:35
static const uint32_t DQM_PROP_TYPE_TH1I
Definition: DQMNet.h:36
DQMFileIterator::LumiEntry currentLumi_
RunNumber_t run() const
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
void logFileAction(const std::string &msg, const std::string &fileName="") const
static void fillDescription(ParameterSetDescription &desc)
const ::dqmstorepb::ROOTFilePB_Histo & histo(int index) const
MonitorElement * book2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:221
MonitorElement * bookInt(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:73
void add(std::string const &label, ParameterSetDescription const &psetDescription)
MonitorElement * book2DD(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:347
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:38
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:32
LuminosityBlockAuxiliary const & aux() const
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=nullptr)
Definition: RunPrincipal.cc:25
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:27
MonitorElement * book1D(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:98
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
#define str(s)
MonitorElement * book3D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, FUNC onbooking=NOOP())
Definition: DQMStore.h:376
MonitorElement * book2I(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:305
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:62
static const uint32_t DQM_PROP_TYPE_TH2I
Definition: DQMNet.h:40
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:31
static void fillDescription(edm::ParameterSetDescription &d)
DQMProtobufReader(edm::ParameterSet const &, edm::InputSourceDescription const &)
def move(src, dest)
Definition: eostools.py:511
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:45
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:37