10 #include <boost/property_tree/json_parser.hpp> 11 #include <openssl/md5.h> 12 #include <fmt/printf.h> 14 #include <google/protobuf/io/coded_stream.h> 15 #include <google/protobuf/io/gzip_stream.h> 16 #include <google/protobuf/io/zero_copy_stream_impl.h> 20 #include <TBufferFile.h> 43 if (
tag_ !=
"UNKNOWN") {
49 throw cms::Exception(
"DQMFileSaverPB") <<
"EvFDaqDirector is not available";
51 std::ofstream
file(initFileName);
54 <<
"Cannot create INI file: " << initFileName <<
" error: " << strerror(errno);
70 namespace bpt = boost::property_tree;
84 std::filesystem::create_directories(runDir);
88 if (
tag_ ==
"UNKNOWN") {
89 baseName = fmt::sprintf(
"%s/run%06d_ls%04d_%s", runDir,
fp.run_,
fp.lumi_,
streamLabel_);
94 jsonFilePathName = baseName +
".jsn";
95 openJsonFilePathName = jsonFilePathName +
".open";
97 histoFilePathName = baseName +
".pb";
98 openHistoFilePathName = histoFilePathName +
".open";
103 openHistoFilePathName =
110 bool abortFlag =
false;
113 this->
savePB(&*store, openHistoFilePathName,
fp.run_,
fp.lumi_);
116 ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
124 write_json(openJsonFilePathName,
pt);
125 ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
138 namespace bpt = boost::property_tree;
139 namespace bfs = std::filesystem;
145 hostnameReturn = gethostname(
host,
sizeof(
host));
146 if (hostnameReturn == -1)
147 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get host name";
150 std::ostringstream oss_pid;
157 struct stat dataFileStat;
158 dataFileStat.st_size = 0;
160 if (
stat(dataFilePathName.c_str(), &dataFileStat) != 0)
161 throw cms::Exception(
"fillJson") <<
"Internal error, cannot get data file: " << dataFilePathName;
163 dataFileName =
bfs::path(dataFilePathName).filename().string();
167 bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize,
inputFiles, fileAdler32,
168 transferDestination, mergeType, hltErrorEvents;
170 processedEvents.put(
"", nProcessed);
171 acceptedEvents.put(
"", nProcessed);
173 errorEvents.put(
"", 0);
175 fileList.put(
"", dataFileName);
176 fileSize.put(
"", dataFileStat.st_size);
178 fileAdler32.put(
"", -1);
179 transferDestination.put(
"", transferDestinationStr);
180 mergeType.put(
"", mergeTypeStr);
181 hltErrorEvents.put(
"", 0);
183 data.push_back(std::make_pair(
"", processedEvents));
184 data.push_back(std::make_pair(
"", acceptedEvents));
185 data.push_back(std::make_pair(
"", errorEvents));
186 data.push_back(std::make_pair(
"", bitmask));
187 data.push_back(std::make_pair(
"", fileList));
188 data.push_back(std::make_pair(
"", fileSize));
190 data.push_back(std::make_pair(
"", fileAdler32));
191 data.push_back(std::make_pair(
"", transferDestination));
192 data.push_back(std::make_pair(
"", mergeType));
193 data.push_back(std::make_pair(
"", hltErrorEvents));
195 pt.add_child(
"data",
data);
197 if (fms ==
nullptr) {
198 pt.put(
"definition",
"/fakeDefinition.jsn");
204 pt.put(
"definition", outJsonDefName.string());
208 sprintf(sourceInfo,
"%s_%d",
host, pid);
209 pt.put(
"source", sourceInfo);
216 desc.setComment(
"Saves histograms from DQM store, HLT->pb workflow.");
218 desc.addUntracked<
bool>(
"fakeFilterUnitMode",
false)->setComment(
"If set, EvFDaqDirector is emulated and not used.");
220 desc.addUntracked<
std::string>(
"streamLabel",
"streamDQMHistograms")->setComment(
"Label of the stream.");
232 using google::protobuf::io::FileOutputStream;
233 using google::protobuf::io::GzipOutputStream;
234 using google::protobuf::io::StringOutputStream;
236 unsigned int nme = 0;
242 for (
auto const me : mes) {
243 TBufferFile
buffer(TBufferFile::kWrite);
245 TObjString
object(
me->tagString().c_str());
246 buffer.WriteObject(&
object);
248 buffer.WriteObject(
me->getRootObject());
250 dqmstorepb::ROOTFilePB::Histo&
histo = *dqmstore_message.add_histo();
251 histo.set_full_pathname(
me->getFullname());
254 if (
me->getLumiFlag())
256 if (
me->getEfficiencyFlag())
261 if (
tag_ ==
"UNKNOWN") {
266 std::vector<char> compression_output(maxOutputSize);
267 uLong total_out = this->
compressME(buffer, maxOutputSize, compression_output.data());
268 histo.set_streamed_histo(compression_output.data(), total_out);
272 for (
const auto& qr :
me->getQReports()) {
276 std::snprintf(
buf,
sizeof(
buf),
"qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
277 result =
'<' +
me->getName() +
'.' + qr->getQRName() +
'>';
279 result += qr->getAlgorithm() +
':' + qr->getMessage();
280 result +=
"</" +
me->getName() +
'.' + qr->getQRName() +
'>';
283 dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
284 TBufferFile qr_buffer(TBufferFile::kWrite);
285 qr_buffer.WriteObject(&
str);
286 qr_histo.set_full_pathname(
me->getFullname() +
'.' + qr->getQRName());
288 qr_histo.set_size(qr_buffer.Length());
291 if (
tag_ ==
"UNKNOWN") {
292 qr_histo.set_streamed_histo((
void const*)qr_buffer.Buffer(), qr_buffer.Length());
296 char compression_output[maxOutputSize];
297 uLong total_out = this->
compressME(qr_buffer, maxOutputSize, compression_output);
298 qr_histo.set_streamed_histo(compression_output, total_out);
313 ::open(
filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
314 FileOutputStream file_stream(filedescriptor);
315 if (
tag_ ==
"UNKNOWN") {
317 options.format = GzipOutputStream::GZIP;
319 GzipOutputStream gzip_stream(&file_stream,
options);
320 dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
327 dqmstore_message.SerializeToZeroCopyStream(&file_stream);
334 edm::LogInfo(
"DQMFileSaverPB") <<
"savePB: successfully wrote " << nme <<
" objects " 335 <<
"into DQM file '" <<
filename <<
"'\n";
342 int n16kBlocks = (bufferSize + 16383) / 16384;
343 int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
344 return maxOutputSize;
348 z_stream deflateStream;
349 deflateStream.zalloc = Z_NULL;
350 deflateStream.zfree = Z_NULL;
351 deflateStream.opaque = Z_NULL;
352 deflateStream.avail_in = (uInt)
buffer.Length() + 1;
353 deflateStream.next_in = (Bytef*)
buffer.Buffer();
354 deflateStream.avail_out = (uInt)maxOutputSize;
355 deflateStream.next_out = (Bytef*)compression_output;
358 deflateInit(&deflateStream, Z_BEST_COMPRESSION);
359 deflate(&deflateStream, Z_FINISH);
360 deflateEnd(&deflateStream);
362 return deflateStream.total_out;
std::string transferDestination_
DQMFileSaverPB(const edm::ParameterSet &ps)
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
static const std::string filename(const FileParameters &fp, bool useLumi=false)
T getUntrackedParameter(std::string const &, T const &) const
~DQMFileSaverPB() override
void initRun() const override
void addDefault(ParameterSetDescription const &psetDescription)
void savePB(DQMStore *store, std::string const &filename, int run, int lumi) const
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
#define DEFINE_FWK_MODULE(type)
ulong compressME(const TBufferFile &buffer, int maxOutputSize, char *compression_output) const
int getMaxCompressedSize(int bufferSize) const
Log< level::Info, false > LogInfo
void saveLumi(const FileParameters &fp) const override
char data[epos_bytes_allocation]
void saveRun(const FileParameters &fp) const override
static void fillDescription(edm::ParameterSetDescription &d)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
static boost::property_tree::ptree fillJson(int run, int lumi, const std::string &dataFilePathName, const std::string &transferDestinationStr, const std::string &mergeTypeStr, evf::FastMonitoringService *fms)
static const uint32_t DQM_PROP_LUMI