CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
DQMProtobufReader.cc
Go to the documentation of this file.
1 #include "DQMProtobufReader.h"
2 
6 
8 // #include "FWCore/Sources/interface/ProducerSourceBase.h"
9 
11 #include <google/protobuf/io/coded_stream.h>
12 #include <google/protobuf/io/gzip_stream.h>
13 #include <google/protobuf/io/zero_copy_stream_impl.h>
14 
15 #include "TBufferFile.h"
16 
17 #include <regex>
18 #include <cstdlib>
19 
20 using namespace dqmservices;
21 
23  : PuttableSourceBase(pset, desc), fiterator_(pset) {
24  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
25  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
26  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
27  flagLoadFiles_ = pset.getUntrackedParameter<bool>("loadFiles");
28 
29  produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceDataPath");
30  produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceJsonPath");
31  produces<DQMToken, edm::Transition::BeginRun>("DQMGenerationRecoRun");
32  produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
33 }
34 
36 
39  typedef DQMFileIterator::LumiEntry LumiEntry;
40 
41  // fiterator_.logFileAction("getNextItemType");
42 
43  for (;;) {
45 
46  if (edm::shutdown_flag.load()) {
47  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
48  return InputSource::IsStop;
49  }
50 
51  // check for end of run file and force quit
52  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
53  return InputSource::IsStop;
54  }
55 
56  // check for end of run and quit if everything has been processed.
57  // this is the clean exit
58  if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
59  return InputSource::IsStop;
60  }
61 
62  // skip to the next file if we have no files openned yet
63  if (fiterator_.lumiReady()) {
64  return InputSource::IsLumi;
65  }
66 
67  fiterator_.delay();
68  // BUG: for an unknown reason it fails after a certain time if we use
69  // IsSynchronize state
70  //
71  // comment out in order to block at this level
72  // return InputSource::IsSynchronize;
73  }
74 
75  // this is unreachable
76 }
77 
78 std::shared_ptr<edm::RunAuxiliary> DQMProtobufReader::readRunAuxiliary_() {
79  // fiterator_.logFileAction("readRunAuxiliary_");
80 
82  return std::shared_ptr<edm::RunAuxiliary>(aux);
83 }
84 
86  // fiterator_.logFileAction("readRun_");
88 
90  std::vector<MonitorElement*> allMEs = store->getAllContents("");
91  for (auto const& ME : allMEs) {
92  ME->Reset();
93  }
94 }
95 
96 std::shared_ptr<edm::LuminosityBlockAuxiliary> DQMProtobufReader::readLuminosityBlockAuxiliary_() {
97  // fiterator_.logFileAction("readLuminosityBlockAuxiliary_");
98 
102 
103  return std::shared_ptr<edm::LuminosityBlockAuxiliary>(aux);
104 }
105 
107  // fiterator_.logFileAction("readLuminosityBlock_");
108 
110  jr->reportInputLumiSection(lbCache.id().run(), lbCache.id().luminosityBlock());
112 }
113 
116 
117  // clear the old lumi histograms
118  std::vector<MonitorElement*> allMEs = store->getAllContents("");
119  for (auto const& ME : allMEs) {
120  // We do not want to reset Run Products here!
121  if (ME->getLumiFlag()) {
122  ME->Reset();
123  }
124  }
125 
126  // load the new file
129 
130  std::unique_ptr<std::string> path_product(new std::string(path));
131  std::unique_ptr<std::string> json_product(new std::string(jspath));
132 
133  lb.put(std::move(path_product), "sourceDataPath");
134  lb.put(std::move(json_product), "sourceJsonPath");
135 
136  if (flagLoadFiles_) {
137  if (!std::filesystem::exists(path)) {
138  fiterator_.logFileAction("Data file is missing ", path);
139  fiterator_.logLumiState(currentLumi_, "error: data file missing");
140  return;
141  }
142 
143  fiterator_.logFileAction("Initiating request to open file ", path);
144  fiterator_.logFileAction("Successfully opened file ", path);
145  load(&*store, path);
146  fiterator_.logFileAction("Closed file ", path);
147  fiterator_.logLumiState(currentLumi_, "close: ok");
148  } else {
149  fiterator_.logFileAction("Not loading the data file at source level ", path);
150  fiterator_.logLumiState(currentLumi_, "close: not loading");
151  }
152 }
153 
155  using google::protobuf::io::ArrayInputStream;
156  using google::protobuf::io::CodedInputStream;
157  using google::protobuf::io::FileInputStream;
158  using google::protobuf::io::FileOutputStream;
159  using google::protobuf::io::GzipInputStream;
160  using google::protobuf::io::GzipOutputStream;
161 
162  int filedescriptor;
163  if ((filedescriptor = ::open(filename.c_str(), O_RDONLY)) == -1) {
164  edm::LogError("DQMProtobufReader") << "File " << filename << " does not exist.";
165  }
166 
167  dqmstorepb::ROOTFilePB dqmstore_message;
168  FileInputStream fin(filedescriptor);
169  GzipInputStream input(&fin);
170  CodedInputStream input_coded(&input);
171  input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
172  if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
173  edm::LogError("DQMProtobufReader") << "Fatal parsing file '" << filename << "'";
174  }
175 
176  ::close(filedescriptor);
177 
178  for (int i = 0; i < dqmstore_message.histo_size(); ++i) {
179  TObject* obj = nullptr;
180  dqmstorepb::ROOTFilePB::Histo const& h = dqmstore_message.histo(i);
181 
182  size_t slash = h.full_pathname().rfind('/');
183  size_t dirpos = (slash == std::string::npos ? 0 : slash);
184  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
185  std::string objname, dirname;
186  dirname.assign(h.full_pathname(), 0, dirpos);
187  objname.assign(h.full_pathname(), namepos, std::string::npos);
188  TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE);
189  buf.Reset();
190  if (buf.Length() == buf.BufferSize()) {
191  obj = nullptr;
192  } else {
193  buf.InitMap();
194  void* ptr = buf.ReadObjectAny(nullptr);
195  obj = reinterpret_cast<TObject*>(ptr);
196  }
197 
198  if (!obj) {
199  edm::LogError("DQMProtobufReader") << "Error reading element:'" << h.full_pathname();
200  }
201 
202  store->setCurrentFolder(dirname);
203 
204  if (h.flags() & DQMNet::DQM_PROP_LUMI) {
205  store->setScope(MonitorElementData::Scope::LUMI);
206  } else {
207  store->setScope(MonitorElementData::Scope::RUN);
208  }
209 
210  if (obj) {
211  int kind = h.flags() & DQMNet::DQM_PROP_TYPE_MASK;
212  if (kind == DQMNet::DQM_PROP_TYPE_INT) {
213  MonitorElement* me = store->bookInt(objname);
214  auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
215  std::regex parseint{"<.*>i=(.*)</.*>"};
216  std::smatch match;
217  bool ok = std::regex_match(expression, match, parseint);
218  if (!ok) {
219  edm::LogError("DQMProtobufReader") << "Malformed object of type INT: '" << expression << "'";
220  continue;
221  }
222  int value = std::atoi(match[1].str().c_str());
223  me->Fill(value);
224  } else if (kind == DQMNet::DQM_PROP_TYPE_REAL) {
225  MonitorElement* me = store->bookFloat(objname);
226  auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
227  std::regex parsefloat{"<.*>f=(.*)</.*>"};
228  std::smatch match;
229  bool ok = std::regex_match(expression, match, parsefloat);
230  if (!ok) {
231  edm::LogError("DQMProtobufReader") << "Malformed object of type REAL: '" << expression << "'";
232  continue;
233  }
234  double value = std::atof(match[1].str().c_str());
235  me->Fill(value);
236  } else if (kind == DQMNet::DQM_PROP_TYPE_STRING) {
237  auto value = static_cast<TObjString*>(obj)->String();
238  store->bookString(objname, value);
239  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1F) {
240  auto value = static_cast<TH1F*>(obj);
241  store->book1D(objname, value);
242  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1S) {
243  auto value = static_cast<TH1S*>(obj);
244  store->book1S(objname, value);
245  } else if (kind == DQMNet::DQM_PROP_TYPE_TH1D) {
246  auto value = static_cast<TH1D*>(obj);
247  store->book1DD(objname, value);
248  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2F) {
249  auto value = static_cast<TH2F*>(obj);
250  store->book2D(objname, value);
251  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2S) {
252  auto value = static_cast<TH2S*>(obj);
253  store->book2S(objname, value);
254  } else if (kind == DQMNet::DQM_PROP_TYPE_TH2D) {
255  auto value = static_cast<TH2D*>(obj);
256  store->book2DD(objname, value);
257  } else if (kind == DQMNet::DQM_PROP_TYPE_TH3F) {
258  auto value = static_cast<TH3F*>(obj);
259  store->book3D(objname, value);
260  } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF) {
261  auto value = static_cast<TProfile*>(obj);
262  store->bookProfile(objname, value);
263  } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF2D) {
264  auto value = static_cast<TProfile2D*>(obj);
265  store->bookProfile2D(objname, value);
266  } else {
267  edm::LogError("DQMProtobufReader") << "Unknown type: " << kind;
268  }
269  delete obj;
270  }
271  }
272 }
273 
275 
278 
279  desc.setComment(
280  "Creates runs and lumis and fills the dqmstore from protocol buffer "
281  "files.");
283 
284  desc.addUntracked<bool>("skipFirstLumis", false)
285  ->setComment(
286  "Skip (and ignore the minEventsPerLumi parameter) for the files "
287  "which have been available at the begining of the processing. "
288  "If set to true, the reader will open last available file for "
289  "processing.");
290 
291  desc.addUntracked<bool>("deleteDatFiles", false)
292  ->setComment(
293  "Delete data files after they have been closed, in order to "
294  "save disk space.");
295 
296  desc.addUntracked<bool>("endOfRunKills", false)
297  ->setComment(
298  "Kill the processing as soon as the end-of-run file appears, even if "
299  "there are/will be unprocessed lumisections.");
300 
301  desc.addUntracked<bool>("loadFiles", true)
302  ->setComment(
303  "Tells the source load the data files. If set to false, source will create skeleton lumi transitions.");
304 
306  descriptions.add("source", desc);
307 }
308 
311 
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:140
T getUntrackedParameter(std::string const &, T const &) const
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:35
MonitorElement * bookFloat(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:80
MonitorElement * bookProfile2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, double lowZ, double highZ, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:399
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:43
MonitorElement * book2S(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:219
void readEvent_(edm::EventPrincipal &) override
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:39
void load(DQMStore *store, std::string filename)
void fillLuminosityBlockPrincipal(ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
void logLumiState(const LumiEntry &lumi, const std::string &msg)
ProcessHistoryID const & processHistoryID() const
volatile std::atomic< bool > shutdown_flag
void setCurrentFolder(std::string const &fullpath) override
Definition: DQMStore.h:569
virtual MonitorElementData::Scope setScope(MonitorElementData::Scope newscope)
Definition: DQMStore.cc:46
LuminosityBlockAuxiliary const & aux() const
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:40
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:34
Log< level::Error, false > LogError
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void readRun_(edm::RunPrincipal &rpCache) override
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
Definition: DQMStore.cc:609
void readLuminosityBlock_(edm::LuminosityBlockPrincipal &lbCache) override
MonitorElement * bookString(TString const &name, TString const &value, FUNC onbooking=NOOP())
Definition: DQMStore.h:87
static std::string const input
Definition: EdmProvDump.cc:47
void Fill(long long x)
Definition: ME.h:11
void setComment(std::string const &value)
MonitorElement * book1DD(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:155
std::shared_ptr< edm::RunAuxiliary > readRunAuxiliary_() override
#define DEFINE_FWK_INPUT_SOURCE(type)
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:31
MonitorElement * bookProfile(TString const &name, TString const &title, int nchX, double lowX, double highX, int, double lowY, double highY, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:322
void put(std::unique_ptr< PROD > product)
Put a new product.
void reportInputLumiSection(unsigned int run, unsigned int lumiSectId)
Definition: JobReport.cc:466
std::shared_ptr< edm::LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
def move
Definition: eostools.py:511
RunNumber_t run() const
void beginLuminosityBlock(edm::LuminosityBlock &lb) override
MonitorElement * book1S(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:133
void logFileAction(const std::string &msg, const std::string &fileName="") const
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:36
DQMFileIterator::LumiEntry currentLumi_
static void fillDescription(ParameterSetDescription &desc)
MonitorElement * book2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:177
MonitorElement * bookInt(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:73
LuminosityBlockNumber_t luminosityBlock() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
MonitorElement * book2DD(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:261
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:329
edm::InputSource::ItemType getNextItemType() override
tuple filename
Definition: lut2db_cfg.py:20
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:38
std::pair< typename Association::data_type::first_type, double > match(Reference key, Association association, bool bestMatchByMaxValue)
Generic matching function.
Definition: Utils.h:10
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:33
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=nullptr)
Definition: RunPrincipal.cc:28
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:28
MonitorElement * book1D(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:98
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
#define str(s)
MonitorElement * book3D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, FUNC onbooking=NOOP())
Definition: DQMStore.h:290
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:61
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:32
static void fillDescription(edm::ParameterSetDescription &d)
DQMProtobufReader(edm::ParameterSet const &, edm::InputSourceDescription const &)
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:44
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:37