10 #include <boost/property_tree/json_parser.hpp>
11 #include <openssl/md5.h>
12 #include <fmt/printf.h>
14 #include <google/protobuf/io/coded_stream.h>
15 #include <google/protobuf/io/gzip_stream.h>
16 #include <google/protobuf/io/zero_copy_stream_impl.h>
20 #include <TBufferFile.h>
43 if (
tag_ !=
"UNKNOWN") {
59 std::ofstream
file(initFileName);
66 namespace bpt = boost::property_tree;
80 std::filesystem::create_directories(runDir);
84 if (
tag_ ==
"UNKNOWN") {
85 baseName = fmt::sprintf(
"%s/run%06d_ls%04d_%s", runDir,
fp.run_,
fp.lumi_,
streamLabel_);
90 jsonFilePathName = baseName +
".jsn";
91 openJsonFilePathName = jsonFilePathName +
".open";
93 histoFilePathName = baseName +
".pb";
94 openHistoFilePathName = histoFilePathName +
".open";
99 openHistoFilePathName =
108 this->
savePB(&*store, openHistoFilePathName,
fp.run_,
fp.lumi_);
111 ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
116 write_json(openJsonFilePathName,
pt);
117 ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
130 namespace bpt = boost::property_tree;
131 namespace bfs = std::filesystem;
137 hostnameReturn = gethostname(
host,
sizeof(
host));
138 if (hostnameReturn == -1)
139 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get host name";
142 std::ostringstream oss_pid;
149 struct stat dataFileStat;
150 dataFileStat.st_size = 0;
152 if (
stat(dataFilePathName.c_str(), &dataFileStat) != 0)
153 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get data file: " << dataFilePathName;
155 dataFileName =
bfs::path(dataFilePathName).filename().string();
159 bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize,
inputFiles, fileAdler32,
160 transferDestination, mergeType, hltErrorEvents;
162 processedEvents.put(
"", nProcessed);
163 acceptedEvents.put(
"", nProcessed);
165 errorEvents.put(
"", 0);
167 fileList.put(
"", dataFileName);
168 fileSize.put(
"", dataFileStat.st_size);
170 fileAdler32.put(
"", -1);
171 transferDestination.put(
"", transferDestinationStr);
172 mergeType.put(
"", mergeTypeStr);
173 hltErrorEvents.put(
"", 0);
175 data.push_back(std::make_pair(
"", processedEvents));
176 data.push_back(std::make_pair(
"", acceptedEvents));
177 data.push_back(std::make_pair(
"", errorEvents));
178 data.push_back(std::make_pair(
"", bitmask));
179 data.push_back(std::make_pair(
"", fileList));
180 data.push_back(std::make_pair(
"", fileSize));
182 data.push_back(std::make_pair(
"", fileAdler32));
183 data.push_back(std::make_pair(
"", transferDestination));
184 data.push_back(std::make_pair(
"", mergeType));
185 data.push_back(std::make_pair(
"", hltErrorEvents));
187 pt.add_child(
"data",
data);
189 if (fms ==
nullptr) {
190 pt.put(
"definition",
"/fakeDefinition.jsn");
196 pt.put(
"definition", outJsonDefName.string());
200 sprintf(sourceInfo,
"%s_%d",
host, pid);
201 pt.put(
"source", sourceInfo);
208 desc.setComment(
"Saves histograms from DQM store, HLT->pb workflow.");
210 desc.addUntracked<
bool>(
"fakeFilterUnitMode",
false)->setComment(
"If set, EvFDaqDirector is emulated and not used.");
212 desc.addUntracked<
std::string>(
"streamLabel",
"streamDQMHistograms")->setComment(
"Label of the stream.");
224 using google::protobuf::io::FileOutputStream;
225 using google::protobuf::io::GzipOutputStream;
226 using google::protobuf::io::StringOutputStream;
228 unsigned int nme = 0;
234 for (
auto const me : mes) {
235 TBufferFile
buffer(TBufferFile::kWrite);
237 TObjString
object(
me->tagString().c_str());
238 buffer.WriteObject(&
object);
240 buffer.WriteObject(
me->getRootObject());
243 histo.set_full_pathname(
me->getFullname());
246 if (
me->getLumiFlag())
248 if (
me->getEfficiencyFlag())
253 if (
tag_ ==
"UNKNOWN") {
258 char compression_output[maxOutputSize];
259 uLong total_out = this->
compressME(buffer, maxOutputSize, compression_output);
260 histo.set_streamed_histo(compression_output, total_out);
264 for (
const auto& qr :
me->getQReports()) {
268 std::snprintf(
buf,
sizeof(
buf),
"qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
269 result =
'<' +
me->getName() +
'.' + qr->getQRName() +
'>';
271 result += qr->getAlgorithm() +
':' + qr->getMessage();
272 result +=
"</" +
me->getName() +
'.' + qr->getQRName() +
'>';
276 TBufferFile qr_buffer(TBufferFile::kWrite);
277 qr_buffer.WriteObject(&
str);
278 qr_histo.set_full_pathname(
me->getFullname() +
'.' + qr->getQRName());
280 qr_histo.set_size(qr_buffer.Length());
283 if (
tag_ ==
"UNKNOWN") {
284 qr_histo.set_streamed_histo((
void const*)qr_buffer.Buffer(), qr_buffer.Length());
288 char compression_output[maxOutputSize];
289 uLong total_out = this->
compressME(qr_buffer, maxOutputSize, compression_output);
290 qr_histo.set_streamed_histo(compression_output, total_out);
305 ::open(
filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
306 FileOutputStream file_stream(filedescriptor);
307 if (
tag_ ==
"UNKNOWN") {
309 options.format = GzipOutputStream::GZIP;
311 GzipOutputStream gzip_stream(&file_stream,
options);
312 dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
319 dqmstore_message.SerializeToZeroCopyStream(&file_stream);
326 ::close(filedescriptor);
329 edm::LogInfo(
"DQMFileSaverPB") <<
"savePB: successfully wrote " << nme <<
" objects "
330 <<
"into DQM file '" <<
filename <<
"'\n";
337 int n16kBlocks = (bufferSize + 16383) / 16384;
338 int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
339 return maxOutputSize;
343 z_stream deflateStream;
344 deflateStream.zalloc = Z_NULL;
345 deflateStream.zfree = Z_NULL;
346 deflateStream.opaque = Z_NULL;
347 deflateStream.avail_in = (uInt)
buffer.Length() + 1;
348 deflateStream.next_in = (Bytef*)
buffer.Buffer();
349 deflateStream.avail_out = (uInt)maxOutputSize;
350 deflateStream.next_out = (Bytef*)compression_output;
353 deflateInit(&deflateStream, Z_BEST_COMPRESSION);
354 deflate(&deflateStream, Z_FINISH);
355 deflateEnd(&deflateStream);
357 return deflateStream.total_out;