CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
DQMFileSaverPB.cc
Go to the documentation of this file.
1 #include <filesystem>
2 #include <fstream>
3 #include <iostream>
4 #include <string>
5 #include <utility>
6 #include <vector>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <unistd.h>
10 #include <boost/property_tree/json_parser.hpp>
11 #include <openssl/md5.h>
12 #include <fmt/printf.h>
13 
14 #include <google/protobuf/io/coded_stream.h>
15 #include <google/protobuf/io/gzip_stream.h>
16 #include <google/protobuf/io/zero_copy_stream_impl.h>
17 
18 #include <TString.h>
19 #include <TSystem.h>
20 #include <TBufferFile.h>
21 
22 #include "zlib.h"
28 
29 #include "DQMFileSaverPB.h"
30 
31 using namespace dqm;
32 
34  fakeFilterUnitMode_ = ps.getUntrackedParameter<bool>("fakeFilterUnitMode", false);
35  streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel", "streamDQMHistograms");
36  tag_ = ps.getUntrackedParameter<std::string>("tag", "UNKNOWN");
37 
39  mergeType_ = "";
40 
41  // If tag is set we're running in a DQM Live mode.
42  // Snapshot files will be saved for every client, then they will be merged and uploaded to the new DQM GUI.
43  if (tag_ != "UNKNOWN") {
44  streamLabel_ = "DQMLive";
45  }
46 }
47 
49 
51  if (!fakeFilterUnitMode_) {
54  }
55 
56  if (!fakeFilterUnitMode_) {
58  const std::string initFileName = daqDirector->getInitFilePath(streamLabel_);
59  std::ofstream file(initFileName);
60  file.close();
61  }
62 }
63 
65  // get from DAQ2 services where to store the files according to their format
66  namespace bpt = boost::property_tree;
67 
68  std::string openJsonFilePathName;
69  std::string jsonFilePathName;
70  std::string openHistoFilePathName;
71  std::string histoFilePathName;
72 
73  evf::FastMonitoringService* fms = nullptr;
75 
76  // create the files names
77  if (fakeFilterUnitMode_) {
78  std::string runDir = fmt::sprintf("%s/run%06d", fp.path_, fp.run_);
79  std::string baseName = "";
80  std::filesystem::create_directories(runDir);
81  // If tag is configured, append it to the name of the resulting file.
82  // This differentiates files saved by different clients.
83  // If tag is not configured, we don't add it at all to keep the old behaviour unchanged.
84  if (tag_ == "UNKNOWN") {
85  baseName = fmt::sprintf("%s/run%06d_ls%04d_%s", runDir, fp.run_, fp.lumi_, streamLabel_);
86  } else {
87  baseName = fmt::sprintf("%s/run%06d_%s_%s", runDir, fp.run_, tag_, streamLabel_);
88  }
89 
90  jsonFilePathName = baseName + ".jsn";
91  openJsonFilePathName = jsonFilePathName + ".open";
92 
93  histoFilePathName = baseName + ".pb";
94  openHistoFilePathName = histoFilePathName + ".open";
95  } else {
96  openJsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOpenOutputJsonFilePath(fp.lumi_, streamLabel_);
97  jsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(fp.lumi_, streamLabel_);
98 
99  openHistoFilePathName =
100  edm::Service<evf::EvFDaqDirector>()->getOpenProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
101  histoFilePathName = edm::Service<evf::EvFDaqDirector>()->getProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
102 
104  }
105 
106  bool abortFlag = false;
107  if (fms ? fms->getEventsProcessedForLumi(fp.lumi_, &abortFlag) : true) {
108  // Save the file in the open directory.
109  this->savePB(&*store, openHistoFilePathName, fp.run_, fp.lumi_);
110 
111  // Now move the the data and json files into the output directory.
112  ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
113  }
114 
115  if (abortFlag)
116  return;
117 
118  // Write the json file in the open directory.
119  bpt::ptree pt = fillJson(fp.run_, fp.lumi_, histoFilePathName, transferDestination_, mergeType_, fms);
120  write_json(openJsonFilePathName, pt);
121  ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
122 }
123 
125  // no saving for the run
126 }
127 
128 boost::property_tree::ptree DQMFileSaverPB::fillJson(int run,
129  int lumi,
130  const std::string& dataFilePathName,
131  const std::string& transferDestinationStr,
132  const std::string& mergeTypeStr,
134  namespace bpt = boost::property_tree;
135  namespace bfs = std::filesystem;
136 
137  bpt::ptree pt;
138 
139  int hostnameReturn;
140  char host[32];
141  hostnameReturn = gethostname(host, sizeof(host));
142  if (hostnameReturn == -1)
143  throw cms::Exception("fillJson") << "Internal error, cannot get host name";
144 
145  int pid = getpid();
146  std::ostringstream oss_pid;
147  oss_pid << pid;
148 
149  int nProcessed = fms ? (fms->getEventsProcessedForLumi(lumi)) : -1;
150 
151  // Stat the data file: if not there, throw
152  std::string dataFileName;
153  struct stat dataFileStat;
154  dataFileStat.st_size = 0;
155  if (nProcessed) {
156  if (stat(dataFilePathName.c_str(), &dataFileStat) != 0)
157  throw cms::Exception("fillJson") << "Internal error, cannot get data file: " << dataFilePathName;
158  // Extract only the data file name from the full path
159  dataFileName = bfs::path(dataFilePathName).filename().string();
160  }
161  // The availability test of the FastMonitoringService was done in the ctor.
162  bpt::ptree data;
163  bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles, fileAdler32,
164  transferDestination, mergeType, hltErrorEvents;
165 
166  processedEvents.put("", nProcessed); // Processed events
167  acceptedEvents.put("", nProcessed); // Accepted events, same as processed for our purposes
168 
169  errorEvents.put("", 0); // Error events
170  bitmask.put("", 0); // Bitmask of abs of CMSSW return code
171  fileList.put("", dataFileName); // Data file the information refers to
172  fileSize.put("", dataFileStat.st_size); // Size in bytes of the data file
173  inputFiles.put("", ""); // We do not care about input files!
174  fileAdler32.put("", -1); // placeholder to match output json definition
175  transferDestination.put("", transferDestinationStr); // SM Transfer destination field
176  mergeType.put("", mergeTypeStr); // SM Transfer destination field
177  hltErrorEvents.put("", 0); // Error events
178 
179  data.push_back(std::make_pair("", processedEvents));
180  data.push_back(std::make_pair("", acceptedEvents));
181  data.push_back(std::make_pair("", errorEvents));
182  data.push_back(std::make_pair("", bitmask));
183  data.push_back(std::make_pair("", fileList));
184  data.push_back(std::make_pair("", fileSize));
185  data.push_back(std::make_pair("", inputFiles));
186  data.push_back(std::make_pair("", fileAdler32));
187  data.push_back(std::make_pair("", transferDestination));
188  data.push_back(std::make_pair("", mergeType));
189  data.push_back(std::make_pair("", hltErrorEvents));
190 
191  pt.add_child("data", data);
192 
193  if (fms == nullptr) {
194  pt.put("definition", "/fakeDefinition.jsn");
195  } else {
196  // The availability test of the EvFDaqDirector Service was done in the ctor.
197  bfs::path outJsonDefName{
198  edm::Service<evf::EvFDaqDirector>()->baseRunDir()}; //we assume this file is written bu the EvF Output module
199  outJsonDefName /= (std::string("output_") + oss_pid.str() + std::string(".jsd"));
200  pt.put("definition", outJsonDefName.string());
201  }
202 
203  char sourceInfo[64]; //host and pid information
204  sprintf(sourceInfo, "%s_%d", host, pid);
205  pt.put("source", sourceInfo);
206 
207  return pt;
208 }
209 
212  desc.setComment("Saves histograms from DQM store, HLT->pb workflow.");
213 
214  desc.addUntracked<bool>("fakeFilterUnitMode", false)->setComment("If set, EvFDaqDirector is emulated and not used.");
215 
216  desc.addUntracked<std::string>("streamLabel", "streamDQMHistograms")->setComment("Label of the stream.");
217 
219 
220  // Changed to use addDefault instead of add here because previously
221  // DQMFileSaverOnline and DQMFileSaverPB both used the module label
222  // "saver" which caused conflicting cfi filenames to be generated.
223  // add could be used if unique module labels were given.
224  descriptions.addDefault(desc);
225 }
226 
227 void DQMFileSaverPB::savePB(DQMStore* store, std::string const& filename, int run, int lumi) const {
228  using google::protobuf::io::FileOutputStream;
229  using google::protobuf::io::GzipOutputStream;
230  using google::protobuf::io::StringOutputStream;
231 
232  unsigned int nme = 0;
233 
234  dqmstorepb::ROOTFilePB dqmstore_message;
235 
236  // We save all histograms, indifferent of the lumi flag: even tough we save per lumi, this is a *snapshot*.
237  auto mes = store->getAllContents("");
238  for (auto const me : mes) {
239  TBufferFile buffer(TBufferFile::kWrite);
240  if (me->kind() < MonitorElement::Kind::TH1F) {
241  TObjString object(me->tagString().c_str());
242  buffer.WriteObject(&object);
243  } else {
244  buffer.WriteObject(me->getRootObject());
245  }
246  dqmstorepb::ROOTFilePB::Histo& histo = *dqmstore_message.add_histo();
247  histo.set_full_pathname(me->getFullname());
248  uint32_t flags = 0;
249  flags |= (uint32_t)me->kind();
250  if (me->getLumiFlag())
251  flags |= DQMNet::DQM_PROP_LUMI;
252  if (me->getEfficiencyFlag())
254  histo.set_flags(flags);
255  histo.set_size(buffer.Length());
256 
257  if (tag_ == "UNKNOWN") {
258  histo.set_streamed_histo((void const*)buffer.Buffer(), buffer.Length());
259  } else {
260  // Compress ME blob with zlib
261  int maxOutputSize = this->getMaxCompressedSize(buffer.Length());
262  char compression_output[maxOutputSize];
263  uLong total_out = this->compressME(buffer, maxOutputSize, compression_output);
264  histo.set_streamed_histo(compression_output, total_out);
265  }
266 
267  // Save quality reports
268  for (const auto& qr : me->getQReports()) {
270  // TODO: 64 is likely too short; memory corruption in the old code?
271  char buf[64];
272  std::snprintf(buf, sizeof(buf), "qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
273  result = '<' + me->getName() + '.' + qr->getQRName() + '>';
274  result += buf;
275  result += qr->getAlgorithm() + ':' + qr->getMessage();
276  result += "</" + me->getName() + '.' + qr->getQRName() + '>';
277  TObjString str(result.c_str());
278 
279  dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
280  TBufferFile qr_buffer(TBufferFile::kWrite);
281  qr_buffer.WriteObject(&str);
282  qr_histo.set_full_pathname(me->getFullname() + '.' + qr->getQRName());
283  qr_histo.set_flags(static_cast<uint32_t>(MonitorElement::Kind::STRING));
284  qr_histo.set_size(qr_buffer.Length());
285  // qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
286 
287  if (tag_ == "UNKNOWN") {
288  qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
289  } else {
290  // Compress ME blob with zlib
291  int maxOutputSize = this->getMaxCompressedSize(qr_buffer.Length());
292  char compression_output[maxOutputSize];
293  uLong total_out = this->compressME(qr_buffer, maxOutputSize, compression_output);
294  qr_histo.set_streamed_histo(compression_output, total_out);
295  }
296  }
297 
298  // Save efficiency tag, if any.
299  // XXX not supported by protobuf files.
300 
301  // Save tag if any.
302  // XXX not supported by protobuf files.
303 
304  // Count saved histograms
305  ++nme;
306  }
307 
308  int filedescriptor =
309  ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
310  FileOutputStream file_stream(filedescriptor);
311  if (tag_ == "UNKNOWN") {
313  options.format = GzipOutputStream::GZIP;
314  options.compression_level = 1;
315  GzipOutputStream gzip_stream(&file_stream, options);
316  dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
317 
318  // Flush the internal streams & Close the file descriptor
319  gzip_stream.Close();
320  file_stream.Close();
321  } else {
322  // We zlib compressed individual MEs so no need to compress the entire file again.
323  dqmstore_message.SerializeToZeroCopyStream(&file_stream);
324 
325  // Flush the internal stream & Close the file descriptor
326  file_stream.Close();
327  }
328 
329  // Maybe make some noise.
330  edm::LogInfo("DQMFileSaverPB") << "savePB: successfully wrote " << nme << " objects "
331  << "into DQM file '" << filename << "'\n";
332 }
333 
334 int DQMFileSaverPB::getMaxCompressedSize(int bufferSize) const {
335  // When input data is very badly compressable, zlib will add overhead instead of reducing the size.
336  // There is a minor amount of overhead (6 bytes overall and 5 bytes per 16K block) that is taken
337  // into consideration here to find out potential absolute maximum size of the output.
338  int n16kBlocks = (bufferSize + 16383) / 16384; // round up any fraction of a block
339  int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
340  return maxOutputSize;
341 }
342 
343 ulong DQMFileSaverPB::compressME(const TBufferFile& buffer, int maxOutputSize, char* compression_output) const {
344  z_stream deflateStream;
345  deflateStream.zalloc = Z_NULL;
346  deflateStream.zfree = Z_NULL;
347  deflateStream.opaque = Z_NULL;
348  deflateStream.avail_in = (uInt)buffer.Length() + 1; // size of input, string + terminator
349  deflateStream.next_in = (Bytef*)buffer.Buffer(); // input array
350  deflateStream.avail_out = (uInt)maxOutputSize; // size of output
351  deflateStream.next_out = (Bytef*)compression_output; // output array, result will be placed here
352 
353  // The actual compression
354  deflateInit(&deflateStream, Z_BEST_COMPRESSION);
355  deflate(&deflateStream, Z_FINISH);
356  deflateEnd(&deflateStream);
357 
358  return deflateStream.total_out;
359 }
360 
std::string streamLabel_
T getUntrackedParameter(std::string const &, T const &) const
std::string transferDestination_
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::string getInitFilePath(std::string const &stream) const
DQMFileSaverPB(const edm::ParameterSet &ps)
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:66
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
Definition: DQMStore.cc:609
tuple result
Definition: mps_fire.py:311
~DQMFileSaverPB() override
void setComment(std::string const &value)
void initRun() const override
void addDefault(ParameterSetDescription const &psetDescription)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ulong compressME(const TBufferFile &buffer, int maxOutputSize, char *compression_output) const
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
void savePB(DQMStore *store, std::string const &filename, int run, int lumi) const
list lumi
Definition: dqmdumpme.py:53
Log< level::Info, false > LogInfo
string host
Definition: query.py:115
void saveLumi(const FileParameters &fp) const override
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
tuple filename
Definition: lut2db_cfg.py:20
void saveRun(const FileParameters &fp) const override
static void fillDescription(edm::ParameterSetDescription &d)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
#define str(s)
static boost::property_tree::ptree fillJson(int run, int lumi, const std::string &dataFilePathName, const std::string &transferDestinationStr, const std::string &mergeTypeStr, evf::FastMonitoringService *fms)
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:63
std::string mergeType_
int getMaxCompressedSize(int bufferSize) const