CMS 3D CMS Logo

DQMFileSaverPB.cc
Go to the documentation of this file.
5 
6 #include "DQMFileSaverPB.h"
7 
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 #include <iostream>
12 #include <vector>
13 #include <string>
14 #include <fstream>
15 #include <utility>
16 #include "zlib.h"
17 #include "TString.h"
18 #include "TSystem.h"
19 #include "TBufferFile.h"
20 
21 #include <openssl/md5.h>
22 #include <boost/property_tree/json_parser.hpp>
23 #include <boost/filesystem.hpp>
24 #include <boost/format.hpp>
25 
26 #include <google/protobuf/io/coded_stream.h>
27 #include <google/protobuf/io/gzip_stream.h>
28 #include <google/protobuf/io/zero_copy_stream_impl.h>
30 
31 using namespace dqm;
32 
34  fakeFilterUnitMode_ = ps.getUntrackedParameter<bool>("fakeFilterUnitMode", false);
35  streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel", "streamDQMHistograms");
36  tag_ = ps.getUntrackedParameter<std::string>("tag", "UNKNOWN");
37 
39  mergeType_ = "";
40 
41  // If tag is set we're running in a DQM Live mode.
42  // Snapshot files will be saved for every client, then they will be merged and uploaded to the new DQM GUI.
43  if (tag_ != "UNKNOWN") {
44  streamLabel_ = "DQMLive";
45  }
46 }
47 
49 
51  if (!fakeFilterUnitMode_) {
54  }
55 
56  if (!fakeFilterUnitMode_) {
58  const std::string initFileName = daqDirector->getInitFilePath(streamLabel_);
59  std::ofstream file(initFileName);
60  file.close();
61  }
62 }
63 
65  // get from DAQ2 services where to store the files according to their format
66  namespace bpt = boost::property_tree;
67 
68  std::string openJsonFilePathName;
69  std::string jsonFilePathName;
70  std::string openHistoFilePathName;
71  std::string histoFilePathName;
72 
73  evf::FastMonitoringService* fms = nullptr;
75 
76  // create the files names
77  if (fakeFilterUnitMode_) {
78  std::string runDir = str(boost::format("%s/run%06d") % fp.path_ % fp.run_);
79  std::string baseName = "";
80  boost::filesystem::create_directories(runDir);
81  // If tag is configured, append it to the name of the resulting file.
82  // This differentiates files saved by different clients.
83  // If tag is not configured, we don't add it at all to keep the old behaviour unchanged.
84  if (tag_ == "UNKNOWN") {
85  baseName = str(boost::format("%s/run%06d_ls%04d_%s") % runDir % fp.run_ % fp.lumi_ % streamLabel_);
86  }
87  else {
88  baseName = str(boost::format("%s/run%06d_%s_%s") % runDir % fp.run_ % tag_ % streamLabel_);
89  }
90 
91  jsonFilePathName = baseName + ".jsn";
92  openJsonFilePathName = jsonFilePathName + ".open";
93 
94  histoFilePathName = baseName + ".pb";
95  openHistoFilePathName = histoFilePathName + ".open";
96  } else {
97  openJsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOpenOutputJsonFilePath(fp.lumi_, streamLabel_);
98  jsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(fp.lumi_, streamLabel_);
99 
100  openHistoFilePathName =
101  edm::Service<evf::EvFDaqDirector>()->getOpenProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
102  histoFilePathName = edm::Service<evf::EvFDaqDirector>()->getProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
103 
105  }
106 
107  if (fms ? fms->getEventsProcessedForLumi(fp.lumi_) : true) {
108  // Save the file in the open directory.
109  this->savePB(&*store, openHistoFilePathName, fp.run_, fp.lumi_);
110 
111  // Now move the the data and json files into the output directory.
112  ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
113  }
114 
115  // Write the json file in the open directory.
116  bpt::ptree pt = fillJson(fp.run_, fp.lumi_, histoFilePathName, transferDestination_, mergeType_, fms);
117  write_json(openJsonFilePathName, pt);
118  ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
119 }
120 
122  // no saving for the run
123 }
124 
125 boost::property_tree::ptree DQMFileSaverPB::fillJson(int run,
126  int lumi,
127  const std::string& dataFilePathName,
128  const std::string& transferDestinationStr,
129  const std::string& mergeTypeStr,
131  namespace bpt = boost::property_tree;
132  namespace bfs = boost::filesystem;
133 
134  bpt::ptree pt;
135 
136  int hostnameReturn;
137  char host[32];
138  hostnameReturn = gethostname(host, sizeof(host));
139  if (hostnameReturn == -1)
140  throw cms::Exception("fillJson") << "Internal error, cannot get host name";
141 
142  int pid = getpid();
143  std::ostringstream oss_pid;
144  oss_pid << pid;
145 
146  int nProcessed = fms ? (fms->getEventsProcessedForLumi(lumi)) : -1;
147 
148  // Stat the data file: if not there, throw
149  std::string dataFileName;
150  struct stat dataFileStat;
151  dataFileStat.st_size = 0;
152  if (nProcessed) {
153  if (stat(dataFilePathName.c_str(), &dataFileStat) != 0)
154  throw cms::Exception("fillJson") << "Internal error, cannot get data file: " << dataFilePathName;
155  // Extract only the data file name from the full path
156  dataFileName = bfs::path(dataFilePathName).filename().string();
157  }
158  // The availability test of the FastMonitoringService was done in the ctor.
159  bpt::ptree data;
160  bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles, fileAdler32,
161  transferDestination, mergeType, hltErrorEvents;
162 
163  processedEvents.put("", nProcessed); // Processed events
164  acceptedEvents.put("", nProcessed); // Accepted events, same as processed for our purposes
165 
166  errorEvents.put("", 0); // Error events
167  bitmask.put("", 0); // Bitmask of abs of CMSSW return code
168  fileList.put("", dataFileName); // Data file the information refers to
169  fileSize.put("", dataFileStat.st_size); // Size in bytes of the data file
170  inputFiles.put("", ""); // We do not care about input files!
171  fileAdler32.put("", -1); // placeholder to match output json definition
172  transferDestination.put("", transferDestinationStr); // SM Transfer destination field
173  mergeType.put("", mergeTypeStr); // SM Transfer destination field
174  hltErrorEvents.put("", 0); // Error events
175 
176  data.push_back(std::make_pair("", processedEvents));
177  data.push_back(std::make_pair("", acceptedEvents));
178  data.push_back(std::make_pair("", errorEvents));
179  data.push_back(std::make_pair("", bitmask));
180  data.push_back(std::make_pair("", fileList));
181  data.push_back(std::make_pair("", fileSize));
182  data.push_back(std::make_pair("", inputFiles));
183  data.push_back(std::make_pair("", fileAdler32));
184  data.push_back(std::make_pair("", transferDestination));
185  data.push_back(std::make_pair("", mergeType));
186  data.push_back(std::make_pair("", hltErrorEvents));
187 
188  pt.add_child("data", data);
189 
190  if (fms == nullptr) {
191  pt.put("definition", "/fakeDefinition.jsn");
192  } else {
193  // The availability test of the EvFDaqDirector Service was done in the ctor.
194  bfs::path outJsonDefName{
195  edm::Service<evf::EvFDaqDirector>()->baseRunDir()}; //we assume this file is written bu the EvF Output module
196  outJsonDefName /= (std::string("output_") + oss_pid.str() + std::string(".jsd"));
197  pt.put("definition", outJsonDefName.string());
198  }
199 
200  char sourceInfo[64]; //host and pid information
201  sprintf(sourceInfo, "%s_%d", host, pid);
202  pt.put("source", sourceInfo);
203 
204  return pt;
205 }
206 
209  desc.setComment("Saves histograms from DQM store, HLT->pb workflow.");
210 
211  desc.addUntracked<bool>("fakeFilterUnitMode", false)->setComment("If set, EvFDaqDirector is emulated and not used.");
212 
213  desc.addUntracked<std::string>("streamLabel", "streamDQMHistograms")->setComment("Label of the stream.");
214 
216 
217  // Changed to use addDefault instead of add here because previously
218  // DQMFileSaverOnline and DQMFileSaverPB both used the module label
219  // "saver" which caused conflicting cfi filenames to be generated.
220  // add could be used if unique module labels were given.
221  descriptions.addDefault(desc);
222 }
223 
224 void DQMFileSaverPB::savePB(DQMStore* store, std::string const& filename, int run, int lumi) const {
225  using google::protobuf::io::FileOutputStream;
226  using google::protobuf::io::GzipOutputStream;
227  using google::protobuf::io::StringOutputStream;
228 
229  unsigned int nme = 0;
230 
231  dqmstorepb::ROOTFilePB dqmstore_message;
232 
233  // We save all histograms, indifferent of the lumi flag: even tough we save per lumi, this is a *snapshot*.
234  auto mes = store->getAllContents("");
235  for (auto const me : mes) {
236  TBufferFile buffer(TBufferFile::kWrite);
237  if (me->kind() < MonitorElement::Kind::TH1F) {
238  TObjString object(me->tagString().c_str());
239  buffer.WriteObject(&object);
240  } else {
241  buffer.WriteObject(me->getRootObject());
242  }
243  dqmstorepb::ROOTFilePB::Histo& histo = *dqmstore_message.add_histo();
244  histo.set_full_pathname(me->getFullname());
245  uint32_t flags = 0;
246  flags |= (uint32_t)me->kind();
247  if (me->getLumiFlag())
249  if (me->getEfficiencyFlag())
251  histo.set_flags(flags);
252  histo.set_size(buffer.Length());
253 
254  if (tag_ == "UNKNOWN") {
255  histo.set_streamed_histo((void const*)buffer.Buffer(), buffer.Length());
256  }
257  else {
258  // Compress ME blob with zlib
259  int maxOutputSize = this->getMaxCompressedSize(buffer.Length());
260  char compression_output[maxOutputSize];
261  uLong total_out = this->compressME(buffer, maxOutputSize, compression_output);
262  histo.set_streamed_histo(compression_output, total_out);
263  }
264 
265  // Save quality reports
266  for (const auto& qr : me->getQReports()) {
268  // TODO: 64 is likely too short; memory corruption in the old code?
269  char buf[64];
270  std::snprintf(buf, sizeof(buf), "qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
271  result = '<' + me->getName() + '.' + qr->getQRName() + '>';
272  result += buf;
273  result += qr->getAlgorithm() + ':' + qr->getMessage();
274  result += "</" + me->getName() + '.' + qr->getQRName() + '>';
275  TObjString str(result.c_str());
276 
277  dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
278  TBufferFile qr_buffer(TBufferFile::kWrite);
279  qr_buffer.WriteObject(&str);
280  qr_histo.set_full_pathname(me->getFullname() + '.' + qr->getQRName());
281  qr_histo.set_flags(static_cast<uint32_t>(MonitorElement::Kind::STRING));
282  qr_histo.set_size(qr_buffer.Length());
283  // qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
284 
285  if (tag_ == "UNKNOWN") {
286  qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
287  }
288  else {
289  // Compress ME blob with zlib
290  int maxOutputSize = this->getMaxCompressedSize(qr_buffer.Length());
291  char compression_output[maxOutputSize];
292  uLong total_out = this->compressME(qr_buffer, maxOutputSize, compression_output);
293  qr_histo.set_streamed_histo(compression_output, total_out);
294  }
295  }
296 
297  // Save efficiency tag, if any.
298  // XXX not supported by protobuf files.
299 
300  // Save tag if any.
301  // XXX not supported by protobuf files.
302 
303  // Count saved histograms
304  ++nme;
305  }
306 
307  int filedescriptor =
308  ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
309  FileOutputStream file_stream(filedescriptor);
310  if (tag_ == "UNKNOWN") {
312  options.format = GzipOutputStream::GZIP;
313  options.compression_level = 1;
314  GzipOutputStream gzip_stream(&file_stream, options);
315  dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
316 
317  // Flush the internal streams
318  gzip_stream.Close();
319  file_stream.Close();
320  }
321  else {
322  // We zlib compressed individual MEs so no need to compress the entire file again.
323  dqmstore_message.SerializeToZeroCopyStream(&file_stream);
324 
325  // Flush the internal stream
326  file_stream.Close();
327  }
328 
329  // Close the file descriptor
330  ::close(filedescriptor);
331 
332  // Maybe make some noise.
333  edm::LogInfo("DQMFileSaverPB") << "savePB: successfully wrote " << nme << " objects "
334  << "into DQM file '" << filename << "'\n";
335 }
336 
337 int DQMFileSaverPB::getMaxCompressedSize(int bufferSize) const {
338  // When input data is very badly compressable, zlib will add overhead instead of reducing the size.
339  // There is a minor amount of overhead (6 bytes overall and 5 bytes per 16K block) that is taken
340  // into consideration here to find out potential absolute maximum size of the output.
341  int n16kBlocks = (bufferSize + 16383) / 16384; // round up any fraction of a block
342  int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
343  return maxOutputSize;
344 }
345 
346 ulong DQMFileSaverPB::compressME(const TBufferFile& buffer, int maxOutputSize, char* compression_output) const {
347  z_stream deflateStream;
348  deflateStream.zalloc = Z_NULL;
349  deflateStream.zfree = Z_NULL;
350  deflateStream.opaque = Z_NULL;
351  deflateStream.avail_in = (uInt)buffer.Length() + 1; // size of input, string + terminator
352  deflateStream.next_in = (Bytef *)buffer.Buffer(); // input array
353  deflateStream.avail_out = (uInt)maxOutputSize; // size of output
354  deflateStream.next_out = (Bytef *)compression_output; // output array, result will be placed here
355 
356  // The actual compression
357  deflateInit(&deflateStream, Z_BEST_COMPRESSION);
358  deflate(&deflateStream, Z_FINISH);
359  deflateEnd(&deflateStream);
360 
361  return deflateStream.total_out;
362 }
363 
resolutioncreator_cfi.object
object
Definition: resolutioncreator_cfi.py:4
dqm::DQMFileSaverBase::FileParameters
Definition: DQMFileSaverBase.h:33
Options
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
Definition: FWExpressionValidator.cc:28
evf::FastMonitoringService::getEventsProcessedForLumi
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
Definition: FastMonitoringService.cc:708
DiDispStaMuonMonitor_cfi.pt
pt
Definition: DiDispStaMuonMonitor_cfi.py:39
dqmstorepb::ROOTFilePB::add_histo
::dqmstorepb::ROOTFilePB_Histo * add_histo()
Definition: ROOTFilePB.pb.h:615
LuminosityBlock.h
dqm::DQMFileSaverBase::filename
static const std::string filename(const FileParameters &fp, bool useLumi=false)
Definition: DQMFileSaverBase.cc:91
MonitorElementData::Kind::TH1F
dqm::DQMFileSaverPB::getMaxCompressedSize
int getMaxCompressedSize(int bufferSize) const
Definition: DQMFileSaverPB.cc:337
dqm::DQMFileSaverPB::DQMFileSaverPB
DQMFileSaverPB(const edm::ParameterSet &ps)
Definition: DQMFileSaverPB.cc:33
MonitorElementData::Kind::STRING
edm::LogInfo
Definition: MessageLogger.h:254
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
dqm::DQMFileSaverPB::fakeFilterUnitMode_
bool fakeFilterUnitMode_
Definition: DQMFileSaverPB.h:35
timingPdfMaker.histo
histo
Definition: timingPdfMaker.py:279
DQMStore.h
personalPlayback.fp
fp
Definition: personalPlayback.py:523
DQMFileSaverPB.h
edm::ParameterSet::getUntrackedParameter
T getUntrackedParameter(std::string const &, T const &) const
dqm::implementation::IGetter::getAllContents
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
Definition: DQMStore.cc:609
edmScanValgrind.buffer
buffer
Definition: edmScanValgrind.py:171
ROOTFilePB.pb.h
evf::EvFDaqDirector::getInitFilePath
std::string getInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:439
query.host
host
Definition: query.py:115
options
Definition: options.py:1
evf::FastMonitoringService
Definition: FastMonitoringService.h:69
MakerMacros.h
dqmstorepb::ROOTFilePB_Histo
Definition: ROOTFilePB.pb.h:74
DEFINE_FWK_MODULE
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
dqm::legacy::DQMStore
Definition: DQMStore.h:727
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1111
Service.h
dqm-mbProfile.format
format
Definition: dqm-mbProfile.py:16
DQMNet::DQM_PROP_EFFICIENCY_PLOT
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:64
str
#define str(s)
Definition: TestProcessor.cc:48
dqm::DQMFileSaverPB::savePB
void savePB(DQMStore *store, std::string const &filename, int run, int lumi) const
Definition: DQMFileSaverPB.cc:224
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
dqm::DQMFileSaverPB::tag_
std::string tag_
Definition: DQMFileSaverPB.h:37
dqm::DQMFileSaverBase
Definition: DQMFileSaverBase.h:23
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
edm::ParameterSet
Definition: ParameterSet.h:36
edm::ParameterSetDescription::setComment
void setComment(std::string const &value)
Definition: ParameterSetDescription.cc:33
dqm::DQMFileSaverPB::transferDestination_
std::string transferDestination_
Definition: DQMFileSaverPB.h:38
dqm::DQMFileSaverPB::mergeType_
std::string mergeType_
Definition: DQMFileSaverPB.h:39
dqm::DQMFileSaverPB::compressME
ulong compressME(const TBufferFile &buffer, int maxOutputSize, char *compression_output) const
Definition: DQMFileSaverPB.cc:346
edm::Service
Definition: Service.h:30
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
evf::EvFDaqDirector
Definition: EvFDaqDirector.h:62
dqm::DQMFileSaverPB
Definition: DQMFileSaverPB.h:15
dqmstorepb::ROOTFilePB
Definition: ROOTFilePB.pb.h:271
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
dqm::DQMFileSaverPB::~DQMFileSaverPB
~DQMFileSaverPB() override
dqm::DQMFileSaverPB::saveRun
void saveRun(const FileParameters &fp) const override
Definition: DQMFileSaverPB.cc:121
dqm::DQMFileSaverPB::saveLumi
void saveLumi(const FileParameters &fp) const override
Definition: DQMFileSaverPB.cc:64
writedatasetfile.run
run
Definition: writedatasetfile.py:27
varParsingExample.inputFiles
inputFiles
Definition: varParsingExample.py:6
dqm::DQMFileSaverPB::fillJson
static boost::property_tree::ptree fillJson(int run, int lumi, const std::string &dataFilePathName, const std::string &transferDestinationStr, const std::string &mergeTypeStr, evf::FastMonitoringService *fms)
Definition: DQMFileSaverPB.cc:125
Exception
Definition: hltDiff.cc:246
AlcaSiPixelAliHarvester0T_cff.options
options
Definition: AlcaSiPixelAliHarvester0T_cff.py:42
dqm::DQMFileSaverPB::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DQMFileSaverPB.cc:207
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
DQMNet::DQM_PROP_LUMI
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:61
mps_fire.result
result
Definition: mps_fire.py:303
dqm::DQMFileSaverPB::streamLabel_
std::string streamLabel_
Definition: DQMFileSaverPB.h:36
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
dqm
Definition: DQMStore.h:18
ParameterSet.h
hlt_dqm_clientPB-live_cfg.me
me
Definition: hlt_dqm_clientPB-live_cfg.py:61
dqm::DQMFileSaverBase::fillDescription
static void fillDescription(edm::ParameterSetDescription &d)
Definition: DQMFileSaverBase.cc:138
edm::ConfigurationDescriptions::addDefault
void addDefault(ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:99
lumi
Definition: LumiSectionData.h:20
HLT_2018_cff.flags
flags
Definition: HLT_2018_cff.py:11758
evf::MergeTypePB
Definition: EvFDaqDirector.h:58
dqm::DQMFileSaverPB::initRun
void initRun() const override
Definition: DQMFileSaverPB.cc:50