10 #include <boost/property_tree/json_parser.hpp> 11 #include <openssl/md5.h> 12 #include <fmt/printf.h> 14 #include <google/protobuf/io/coded_stream.h> 15 #include <google/protobuf/io/gzip_stream.h> 16 #include <google/protobuf/io/zero_copy_stream_impl.h> 20 #include <TBufferFile.h> 43 if (
tag_ !=
"UNKNOWN") {
59 std::ofstream
file(initFileName);
66 namespace bpt = boost::property_tree;
80 std::filesystem::create_directories(runDir);
84 if (
tag_ ==
"UNKNOWN") {
85 baseName = fmt::sprintf(
"%s/run%06d_ls%04d_%s", runDir,
fp.run_,
fp.lumi_,
streamLabel_);
90 jsonFilePathName = baseName +
".jsn";
91 openJsonFilePathName = jsonFilePathName +
".open";
93 histoFilePathName = baseName +
".pb";
94 openHistoFilePathName = histoFilePathName +
".open";
99 openHistoFilePathName =
106 bool abortFlag =
false;
109 this->
savePB(&*store, openHistoFilePathName,
fp.run_,
fp.lumi_);
112 ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
120 write_json(openJsonFilePathName,
pt);
121 ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
134 namespace bpt = boost::property_tree;
135 namespace bfs = std::filesystem;
141 hostnameReturn = gethostname(
host,
sizeof(
host));
142 if (hostnameReturn == -1)
143 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get host name";
146 std::ostringstream oss_pid;
153 struct stat dataFileStat;
154 dataFileStat.st_size = 0;
156 if (
stat(dataFilePathName.c_str(), &dataFileStat) != 0)
157 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get data file: " << dataFilePathName;
159 dataFileName =
bfs::path(dataFilePathName).filename().string();
163 bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize,
inputFiles, fileAdler32,
164 transferDestination, mergeType, hltErrorEvents;
166 processedEvents.put(
"", nProcessed);
167 acceptedEvents.put(
"", nProcessed);
169 errorEvents.put(
"", 0);
171 fileList.put(
"", dataFileName);
172 fileSize.put(
"", dataFileStat.st_size);
174 fileAdler32.put(
"", -1);
175 transferDestination.put(
"", transferDestinationStr);
176 mergeType.put(
"", mergeTypeStr);
177 hltErrorEvents.put(
"", 0);
179 data.push_back(std::make_pair(
"", processedEvents));
180 data.push_back(std::make_pair(
"", acceptedEvents));
181 data.push_back(std::make_pair(
"", errorEvents));
182 data.push_back(std::make_pair(
"", bitmask));
183 data.push_back(std::make_pair(
"", fileList));
184 data.push_back(std::make_pair(
"", fileSize));
186 data.push_back(std::make_pair(
"", fileAdler32));
187 data.push_back(std::make_pair(
"", transferDestination));
188 data.push_back(std::make_pair(
"", mergeType));
189 data.push_back(std::make_pair(
"", hltErrorEvents));
191 pt.add_child(
"data",
data);
193 if (fms ==
nullptr) {
194 pt.put(
"definition",
"/fakeDefinition.jsn");
200 pt.put(
"definition", outJsonDefName.string());
204 sprintf(sourceInfo,
"%s_%d",
host, pid);
205 pt.put(
"source", sourceInfo);
212 desc.setComment(
"Saves histograms from DQM store, HLT->pb workflow.");
214 desc.addUntracked<
bool>(
"fakeFilterUnitMode",
false)->setComment(
"If set, EvFDaqDirector is emulated and not used.");
216 desc.addUntracked<
std::string>(
"streamLabel",
"streamDQMHistograms")->setComment(
"Label of the stream.");
228 using google::protobuf::io::FileOutputStream;
229 using google::protobuf::io::GzipOutputStream;
230 using google::protobuf::io::StringOutputStream;
232 unsigned int nme = 0;
238 for (
auto const me : mes) {
239 TBufferFile
buffer(TBufferFile::kWrite);
241 TObjString
object(
me->tagString().c_str());
242 buffer.WriteObject(&
object);
244 buffer.WriteObject(
me->getRootObject());
246 dqmstorepb::ROOTFilePB::Histo&
histo = *dqmstore_message.add_histo();
247 histo.set_full_pathname(
me->getFullname());
250 if (
me->getLumiFlag())
252 if (
me->getEfficiencyFlag())
257 if (
tag_ ==
"UNKNOWN") {
262 std::vector<char> compression_output(maxOutputSize);
263 uLong total_out = this->
compressME(buffer, maxOutputSize, compression_output.data());
264 histo.set_streamed_histo(compression_output.data(), total_out);
268 for (
const auto& qr :
me->getQReports()) {
272 std::snprintf(
buf,
sizeof(
buf),
"qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
273 result =
'<' +
me->getName() +
'.' + qr->getQRName() +
'>';
275 result += qr->getAlgorithm() +
':' + qr->getMessage();
276 result +=
"</" +
me->getName() +
'.' + qr->getQRName() +
'>';
279 dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
280 TBufferFile qr_buffer(TBufferFile::kWrite);
281 qr_buffer.WriteObject(&
str);
282 qr_histo.set_full_pathname(
me->getFullname() +
'.' + qr->getQRName());
284 qr_histo.set_size(qr_buffer.Length());
287 if (
tag_ ==
"UNKNOWN") {
288 qr_histo.set_streamed_histo((
void const*)qr_buffer.Buffer(), qr_buffer.Length());
292 char compression_output[maxOutputSize];
293 uLong total_out = this->
compressME(qr_buffer, maxOutputSize, compression_output);
294 qr_histo.set_streamed_histo(compression_output, total_out);
309 ::open(
filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
310 FileOutputStream file_stream(filedescriptor);
311 if (
tag_ ==
"UNKNOWN") {
313 options.format = GzipOutputStream::GZIP;
315 GzipOutputStream gzip_stream(&file_stream,
options);
316 dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
323 dqmstore_message.SerializeToZeroCopyStream(&file_stream);
330 edm::LogInfo(
"DQMFileSaverPB") <<
"savePB: successfully wrote " << nme <<
" objects " 331 <<
"into DQM file '" <<
filename <<
"'\n";
338 int n16kBlocks = (bufferSize + 16383) / 16384;
339 int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
340 return maxOutputSize;
344 z_stream deflateStream;
345 deflateStream.zalloc = Z_NULL;
346 deflateStream.zfree = Z_NULL;
347 deflateStream.opaque = Z_NULL;
348 deflateStream.avail_in = (uInt)
buffer.Length() + 1;
349 deflateStream.next_in = (Bytef*)
buffer.Buffer();
350 deflateStream.avail_out = (uInt)maxOutputSize;
351 deflateStream.next_out = (Bytef*)compression_output;
354 deflateInit(&deflateStream, Z_BEST_COMPRESSION);
355 deflate(&deflateStream, Z_FINISH);
356 deflateEnd(&deflateStream);
358 return deflateStream.total_out;
std::string transferDestination_
DQMFileSaverPB(const edm::ParameterSet &ps)
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
static const std::string filename(const FileParameters &fp, bool useLumi=false)
T getUntrackedParameter(std::string const &, T const &) const
~DQMFileSaverPB() override
void initRun() const override
void addDefault(ParameterSetDescription const &psetDescription)
void savePB(DQMStore *store, std::string const &filename, int run, int lumi) const
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
#define DEFINE_FWK_MODULE(type)
ulong compressME(const TBufferFile &buffer, int maxOutputSize, char *compression_output) const
int getMaxCompressedSize(int bufferSize) const
std::string getInitFilePath(std::string const &stream) const
Log< level::Info, false > LogInfo
void saveLumi(const FileParameters &fp) const override
char data[epos_bytes_allocation]
void saveRun(const FileParameters &fp) const override
static void fillDescription(edm::ParameterSetDescription &d)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
static boost::property_tree::ptree fillJson(int run, int lumi, const std::string &dataFilePathName, const std::string &transferDestinationStr, const std::string &mergeTypeStr, evf::FastMonitoringService *fms)
static const uint32_t DQM_PROP_LUMI