10 #include <boost/property_tree/json_parser.hpp>
11 #include <openssl/md5.h>
12 #include <fmt/printf.h>
14 #include <google/protobuf/io/coded_stream.h>
15 #include <google/protobuf/io/gzip_stream.h>
16 #include <google/protobuf/io/zero_copy_stream_impl.h>
20 #include <TBufferFile.h>
43 if (tag_ !=
"UNKNOWN") {
44 streamLabel_ =
"DQMLive";
59 std::ofstream
file(initFileName);
66 namespace bpt = boost::property_tree;
80 std::filesystem::create_directories(runDir);
84 if (
tag_ ==
"UNKNOWN") {
90 jsonFilePathName = baseName +
".jsn";
91 openJsonFilePathName = jsonFilePathName +
".open";
93 histoFilePathName = baseName +
".pb";
94 openHistoFilePathName = histoFilePathName +
".open";
99 openHistoFilePathName =
111 ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
116 write_json(openJsonFilePathName, pt);
117 ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
130 namespace bpt = boost::property_tree;
131 namespace bfs = std::filesystem;
137 hostnameReturn = gethostname(host,
sizeof(host));
138 if (hostnameReturn == -1)
139 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get host name";
142 std::ostringstream oss_pid;
149 struct stat dataFileStat;
150 dataFileStat.st_size = 0;
152 if (
stat(dataFilePathName.c_str(), &dataFileStat) != 0)
153 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get data file: " << dataFilePathName;
155 dataFileName =
bfs::path(dataFilePathName).filename().string();
159 bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize,
inputFiles, fileAdler32,
160 transferDestination, mergeType, hltErrorEvents;
162 processedEvents.put(
"", nProcessed);
163 acceptedEvents.put(
"", nProcessed);
165 errorEvents.put(
"", 0);
167 fileList.put(
"", dataFileName);
168 fileSize.put(
"", dataFileStat.st_size);
169 inputFiles.put(
"",
"");
170 fileAdler32.put(
"", -1);
171 transferDestination.put(
"", transferDestinationStr);
172 mergeType.put(
"", mergeTypeStr);
173 hltErrorEvents.put(
"", 0);
175 data.push_back(std::make_pair(
"", processedEvents));
176 data.push_back(std::make_pair(
"", acceptedEvents));
177 data.push_back(std::make_pair(
"", errorEvents));
178 data.push_back(std::make_pair(
"", bitmask));
179 data.push_back(std::make_pair(
"", fileList));
180 data.push_back(std::make_pair(
"", fileSize));
181 data.push_back(std::make_pair(
"", inputFiles));
182 data.push_back(std::make_pair(
"", fileAdler32));
183 data.push_back(std::make_pair(
"", transferDestination));
184 data.push_back(std::make_pair(
"", mergeType));
185 data.push_back(std::make_pair(
"", hltErrorEvents));
187 pt.add_child(
"data", data);
189 if (fms ==
nullptr) {
190 pt.put(
"definition",
"/fakeDefinition.jsn");
196 pt.put(
"definition", outJsonDefName.string());
200 sprintf(sourceInfo,
"%s_%d", host, pid);
201 pt.put(
"source", sourceInfo);
208 desc.
setComment(
"Saves histograms from DQM store, HLT->pb workflow.");
210 desc.
addUntracked<
bool>(
"fakeFilterUnitMode",
false)->setComment(
"If set, EvFDaqDirector is emulated and not used.");
224 using google::protobuf::io::FileOutputStream;
225 using google::protobuf::io::GzipOutputStream;
226 using google::protobuf::io::StringOutputStream;
228 unsigned int nme = 0;
230 dqmstorepb::ROOTFilePB dqmstore_message;
234 for (
auto const me : mes) {
235 TBufferFile
buffer(TBufferFile::kWrite);
237 TObjString object(
me->tagString().c_str());
238 buffer.WriteObject(&
object);
240 buffer.WriteObject(
me->getRootObject());
242 dqmstorepb::ROOTFilePB::Histo&
histo = *dqmstore_message.add_histo();
243 histo.set_full_pathname(
me->getFullname());
245 flags |= (uint32_t)
me->kind();
246 if (
me->getLumiFlag())
248 if (
me->getEfficiencyFlag())
250 histo.set_flags(flags);
251 histo.set_size(buffer.Length());
253 if (
tag_ ==
"UNKNOWN") {
254 histo.set_streamed_histo((
void const*)buffer.Buffer(), buffer.Length());
258 char compression_output[maxOutputSize];
259 uLong total_out = this->
compressME(buffer, maxOutputSize, compression_output);
260 histo.set_streamed_histo(compression_output, total_out);
264 for (
const auto& qr :
me->getQReports()) {
268 std::snprintf(buf,
sizeof(buf),
"qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
269 result =
'<' +
me->getName() +
'.' + qr->getQRName() +
'>';
271 result += qr->getAlgorithm() +
':' + qr->getMessage();
272 result +=
"</" +
me->getName() +
'.' + qr->getQRName() +
'>';
273 TObjString
str(result.c_str());
275 dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
276 TBufferFile qr_buffer(TBufferFile::kWrite);
277 qr_buffer.WriteObject(&
str);
278 qr_histo.set_full_pathname(
me->getFullname() +
'.' + qr->getQRName());
280 qr_histo.set_size(qr_buffer.Length());
283 if (
tag_ ==
"UNKNOWN") {
284 qr_histo.set_streamed_histo((
void const*)qr_buffer.Buffer(), qr_buffer.Length());
288 char compression_output[maxOutputSize];
289 uLong total_out = this->
compressME(qr_buffer, maxOutputSize, compression_output);
290 qr_histo.set_streamed_histo(compression_output, total_out);
305 ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
306 FileOutputStream file_stream(filedescriptor);
307 if (
tag_ ==
"UNKNOWN") {
309 options.format = GzipOutputStream::GZIP;
310 options.compression_level = 1;
311 GzipOutputStream gzip_stream(&file_stream, options);
312 dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
319 dqmstore_message.SerializeToZeroCopyStream(&file_stream);
326 edm::LogInfo(
"DQMFileSaverPB") <<
"savePB: successfully wrote " << nme <<
" objects "
327 <<
"into DQM file '" << filename <<
"'\n";
334 int n16kBlocks = (bufferSize + 16383) / 16384;
335 int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
336 return maxOutputSize;
340 z_stream deflateStream;
341 deflateStream.zalloc = Z_NULL;
342 deflateStream.zfree = Z_NULL;
343 deflateStream.opaque = Z_NULL;
344 deflateStream.avail_in = (uInt)buffer.Length() + 1;
345 deflateStream.next_in = (Bytef*)buffer.Buffer();
346 deflateStream.avail_out = (uInt)maxOutputSize;
347 deflateStream.next_out = (Bytef*)compression_output;
350 deflateInit(&deflateStream, Z_BEST_COMPRESSION);
351 deflate(&deflateStream, Z_FINISH);
352 deflateEnd(&deflateStream);
354 return deflateStream.total_out;
T getUntrackedParameter(std::string const &, T const &) const
std::string transferDestination_
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
#define DEFINE_FWK_MODULE(type)
std::string getInitFilePath(std::string const &stream) const
DQMFileSaverPB(const edm::ParameterSet &ps)
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
~DQMFileSaverPB() override
void setComment(std::string const &value)
void initRun() const override
void addDefault(ParameterSetDescription const &psetDescription)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ulong compressME(const TBufferFile &buffer, int maxOutputSize, char *compression_output) const
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
void savePB(DQMStore *store, std::string const &filename, int run, int lumi) const
Log< level::Info, false > LogInfo
void saveLumi(const FileParameters &fp) const override
char data[epos_bytes_allocation]
void saveRun(const FileParameters &fp) const override
static void fillDescription(edm::ParameterSetDescription &d)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
static boost::property_tree::ptree fillJson(int run, int lumi, const std::string &dataFilePathName, const std::string &transferDestinationStr, const std::string &mergeTypeStr, evf::FastMonitoringService *fms)
static const uint32_t DQM_PROP_LUMI
int getMaxCompressedSize(int bufferSize) const