19 #include "TBufferFile.h"
21 #include <openssl/md5.h>
22 #include <boost/property_tree/json_parser.hpp>
23 #include <boost/filesystem.hpp>
24 #include <boost/format.hpp>
26 #include <google/protobuf/io/coded_stream.h>
27 #include <google/protobuf/io/gzip_stream.h>
28 #include <google/protobuf/io/zero_copy_stream_impl.h>
43 if (
tag_ !=
"UNKNOWN") {
59 std::ofstream
file(initFileName);
66 namespace bpt = boost::property_tree;
80 boost::filesystem::create_directories(runDir);
84 if (
tag_ ==
"UNKNOWN") {
91 jsonFilePathName = baseName +
".jsn";
92 openJsonFilePathName = jsonFilePathName +
".open";
94 histoFilePathName = baseName +
".pb";
95 openHistoFilePathName = histoFilePathName +
".open";
100 openHistoFilePathName =
109 this->
savePB(&*store, openHistoFilePathName,
fp.run_,
fp.lumi_);
112 ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
117 write_json(openJsonFilePathName,
pt);
118 ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
131 namespace bpt = boost::property_tree;
132 namespace bfs = boost::filesystem;
138 hostnameReturn = gethostname(
host,
sizeof(
host));
139 if (hostnameReturn == -1)
140 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get host name";
143 std::ostringstream oss_pid;
150 struct stat dataFileStat;
151 dataFileStat.st_size = 0;
153 if (
stat(dataFilePathName.c_str(), &dataFileStat) != 0)
154 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get data file: " << dataFilePathName;
156 dataFileName =
bfs::path(dataFilePathName).filename().string();
160 bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize,
inputFiles, fileAdler32,
161 transferDestination, mergeType, hltErrorEvents;
163 processedEvents.put(
"", nProcessed);
164 acceptedEvents.put(
"", nProcessed);
166 errorEvents.put(
"", 0);
168 fileList.put(
"", dataFileName);
169 fileSize.put(
"", dataFileStat.st_size);
171 fileAdler32.put(
"", -1);
172 transferDestination.put(
"", transferDestinationStr);
173 mergeType.put(
"", mergeTypeStr);
174 hltErrorEvents.put(
"", 0);
176 data.push_back(std::make_pair(
"", processedEvents));
177 data.push_back(std::make_pair(
"", acceptedEvents));
178 data.push_back(std::make_pair(
"", errorEvents));
179 data.push_back(std::make_pair(
"", bitmask));
180 data.push_back(std::make_pair(
"", fileList));
181 data.push_back(std::make_pair(
"", fileSize));
183 data.push_back(std::make_pair(
"", fileAdler32));
184 data.push_back(std::make_pair(
"", transferDestination));
185 data.push_back(std::make_pair(
"", mergeType));
186 data.push_back(std::make_pair(
"", hltErrorEvents));
188 pt.add_child(
"data",
data);
190 if (fms ==
nullptr) {
191 pt.put(
"definition",
"/fakeDefinition.jsn");
197 pt.put(
"definition", outJsonDefName.string());
201 sprintf(sourceInfo,
"%s_%d",
host, pid);
202 pt.put(
"source", sourceInfo);
209 desc.
setComment(
"Saves histograms from DQM store, HLT->pb workflow.");
211 desc.
addUntracked<
bool>(
"fakeFilterUnitMode",
false)->setComment(
"If set, EvFDaqDirector is emulated and not used.");
225 using google::protobuf::io::FileOutputStream;
226 using google::protobuf::io::GzipOutputStream;
227 using google::protobuf::io::StringOutputStream;
229 unsigned int nme = 0;
235 for (
auto const me : mes) {
236 TBufferFile
buffer(TBufferFile::kWrite);
238 TObjString
object(
me->tagString().c_str());
239 buffer.WriteObject(&
object);
241 buffer.WriteObject(
me->getRootObject());
244 histo.set_full_pathname(
me->getFullname());
247 if (
me->getLumiFlag())
249 if (
me->getEfficiencyFlag())
254 if (
tag_ ==
"UNKNOWN") {
260 char compression_output[maxOutputSize];
261 uLong total_out = this->
compressME(buffer, maxOutputSize, compression_output);
262 histo.set_streamed_histo(compression_output, total_out);
266 for (
const auto& qr :
me->getQReports()) {
270 std::snprintf(
buf,
sizeof(
buf),
"qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
271 result =
'<' +
me->getName() +
'.' + qr->getQRName() +
'>';
273 result += qr->getAlgorithm() +
':' + qr->getMessage();
274 result +=
"</" +
me->getName() +
'.' + qr->getQRName() +
'>';
278 TBufferFile qr_buffer(TBufferFile::kWrite);
279 qr_buffer.WriteObject(&
str);
280 qr_histo.set_full_pathname(
me->getFullname() +
'.' + qr->getQRName());
282 qr_histo.set_size(qr_buffer.Length());
285 if (
tag_ ==
"UNKNOWN") {
286 qr_histo.set_streamed_histo((
void const*)qr_buffer.Buffer(), qr_buffer.Length());
291 char compression_output[maxOutputSize];
292 uLong total_out = this->
compressME(qr_buffer, maxOutputSize, compression_output);
293 qr_histo.set_streamed_histo(compression_output, total_out);
308 ::open(
filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
309 FileOutputStream file_stream(filedescriptor);
310 if (
tag_ ==
"UNKNOWN") {
312 options.format = GzipOutputStream::GZIP;
314 GzipOutputStream gzip_stream(&file_stream,
options);
315 dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
323 dqmstore_message.SerializeToZeroCopyStream(&file_stream);
330 ::close(filedescriptor);
333 edm::LogInfo(
"DQMFileSaverPB") <<
"savePB: successfully wrote " << nme <<
" objects "
334 <<
"into DQM file '" <<
filename <<
"'\n";
341 int n16kBlocks = (bufferSize + 16383) / 16384;
342 int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
343 return maxOutputSize;
347 z_stream deflateStream;
348 deflateStream.zalloc = Z_NULL;
349 deflateStream.zfree = Z_NULL;
350 deflateStream.opaque = Z_NULL;
351 deflateStream.avail_in = (uInt)
buffer.Length() + 1;
352 deflateStream.next_in = (Bytef *)
buffer.Buffer();
353 deflateStream.avail_out = (uInt)maxOutputSize;
354 deflateStream.next_out = (Bytef *)compression_output;
357 deflateInit(&deflateStream, Z_BEST_COMPRESSION);
358 deflate(&deflateStream, Z_FINISH);
359 deflateEnd(&deflateStream);
361 return deflateStream.total_out;