80 #include <sys/types.h>
92 #include <google/protobuf/io/coded_stream.h>
93 #include <google/protobuf/io/gzip_stream.h>
94 #include <google/protobuf/io/zero_copy_stream_impl.h>
97 #include <TBufferFile.h>
99 #include <TObjString.h>
101 #include <TProfile.h>
105 #include <sys/prctl.h>
106 #include <sys/wait.h>
109 #define DEBUG(x, msg) \
111 std::cout << "DEBUG: " << msg << std::flush
129 return (diff < 0 ?
true : diff == 0 ? lhs.
objname < rhs.
objname :
false);
132 void add(TObject *obj_to_add)
const {
133 DEBUG(1,
"Merging: " <<
obj->GetName() <<
" << " << obj_to_add->GetName() << std::endl);
135 if (dynamic_cast<TH1 *>(
obj) &&
dynamic_cast<TH1 *
>(obj_to_add)) {
136 dynamic_cast<TH1 *
>(
obj)->Add(dynamic_cast<TH1 *>(obj_to_add));
137 }
else if (dynamic_cast<TObjString *>(
obj) &&
dynamic_cast<TObjString *
>(obj_to_add)) {
139 DEBUG(1,
"Cannot merge (different types): " <<
obj->GetName() <<
" << " << obj_to_add->GetName() << std::endl);
152 using google::protobuf::io::ArrayInputStream;
153 using google::protobuf::io::CodedInputStream;
154 using google::protobuf::io::FileInputStream;
155 using google::protobuf::io::FileOutputStream;
156 using google::protobuf::io::GzipInputStream;
157 using google::protobuf::io::GzipOutputStream;
163 if (buf.Length() == buf.BufferSize())
167 return reinterpret_cast<TObject *
>(buf.ReadObjectAny(
nullptr));
171 size_t slash = h.full_pathname().rfind(
'/');
172 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
173 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
174 dirname.assign(h.full_pathname(), 0, dirpos);
175 objname.assign(h.full_pathname(), namepos, std::string::npos);
176 TBufferFile
buf(TBufferFile::kRead, h.size(), (
void *)h.streamed_histo().data(), kFALSE);
180 std::cerr <<
"Error reading element: " << h.full_pathname() << std::endl;
184 void writeMessageFD(
const dqmstorepb::ROOTFilePB &dqmstore_output_msg,
int out_fd) {
185 FileOutputStream out_stream(out_fd);
187 options.format = GzipOutputStream::GZIP;
188 options.compression_level = 2;
189 GzipOutputStream gzip_stream(&out_stream, options);
190 dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
198 DEBUG(1,
"Writing file" << std::endl);
201 ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
208 auto mi = micromes.begin();
209 auto me = micromes.end();
211 DEBUG(1,
"Streaming ROOT objects" << std::endl);
212 for (; mi !=
me; ++mi) {
213 dqmstorepb::ROOTFilePB::Histo *
h = dqmstore_output_msg.add_histo();
214 DEBUG(2,
"Streaming ROOT object " << mi->fullname() <<
"\n");
215 h->set_full_pathname(mi->fullname());
216 TBufferFile
buffer(TBufferFile::kWrite);
217 buffer.WriteObject(mi->obj);
218 h->set_size(buffer.Length());
219 h->set_flags(mi->flags);
220 h->set_streamed_histo((
const void *)buffer.Buffer(), buffer.Length());
226 DEBUG(1,
"Processing directory " << curdir <<
"\n");
227 file->cd(curdir.c_str());
229 TIter
next(gDirectory->GetListOfKeys());
230 while ((key = (TKey *)
next())) {
231 TObject *
obj = key->ReadObj();
232 if (dynamic_cast<TDirectory *>(obj)) {
234 subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2);
238 subdir += obj->GetName();
240 }
else if ((dynamic_cast<TH1 *>(obj)) || (dynamic_cast<TObjString *>(obj))) {
241 if (dynamic_cast<TH1 *>(obj)) {
245 DEBUG(2, curdir <<
"/" << obj->GetName() <<
"\n");
246 MicroME mme(obj, curdir, obj->GetName());
248 micromes.insert(mme);
254 assert(filenames.size() == 1);
255 TFile
input(filenames[0].c_str());
256 DEBUG(0,
"Encoding file " << filenames[0] << std::endl);
258 dqmstorepb::ROOTFilePB dqmstore_message;
268 assert(filenames.size() == 1);
269 TFile
output(output_filename.c_str(),
"RECREATE");
270 DEBUG(0,
"Converting file " << filenames[0] << std::endl);
271 dqmstorepb::ROOTFilePB dqmstore_message;
273 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
274 FileInputStream
fin(filedescriptor);
275 GzipInputStream
input(&fin);
276 CodedInputStream input_coded(&input);
277 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
278 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
279 std::cout <<
"Fatal Error opening file " << filenames[0] << std::endl;
282 ::close(filedescriptor);
284 for (
int i = 0;
i < dqmstore_message.histo_size();
i++) {
285 const dqmstorepb::ROOTFilePB::Histo &
h = dqmstore_message.histo(
i);
286 DEBUG(1, h.full_pathname() << std::endl);
287 DEBUG(1, h.size() << std::endl);
288 TBufferFile
buf(TBufferFile::kRead, h.size(), (
void *)h.streamed_histo().data(), kFALSE);
296 size_t end = path.find(
'/', start);
297 if (end == std::string::npos)
301 if (!gDirectory->Get(part.c_str()))
302 gDirectory->mkdir(part.c_str());
303 gDirectory->cd(part.c_str());
305 if (end + 1 >= path.size())
309 end = path.find(
'/', start);
310 if (end == std::string::npos)
314 DEBUG(1, obj->GetName() << std::endl);
320 int dumpFiles(
const std::vector<std::string> &filenames) {
321 assert(!filenames.empty());
322 for (
int i = 0,
e = filenames.size();
i !=
e; ++
i) {
323 DEBUG(0,
"Dumping file " << filenames[
i] << std::endl);
324 dqmstorepb::ROOTFilePB dqmstore_message;
326 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
327 FileInputStream
fin(filedescriptor);
328 GzipInputStream
input(&fin);
329 CodedInputStream input_coded(&input);
330 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
331 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
332 std::cout <<
"Fatal Error opening file " << filenames[0] << std::endl;
335 ::close(filedescriptor);
337 for (
int i = 0;
i < dqmstore_message.histo_size();
i++) {
338 const dqmstorepb::ROOTFilePB::Histo &
h = dqmstore_message.histo(
i);
339 DEBUG(1, h.full_pathname() << std::endl);
340 DEBUG(1, h.size() << std::endl);
341 TBufferFile
buf(TBufferFile::kRead, h.size(), (
void *)h.streamed_histo().data(), kFALSE);
344 DEBUG(1, obj->GetName() << std::endl);
345 DEBUG(1,
"Flags: " << h.flags() << std::endl);
353 dqmstorepb::ROOTFilePB dqmstore_msg;
355 FileInputStream
fin(fd);
356 GzipInputStream
input(&fin);
357 CodedInputStream input_coded(&input);
358 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
359 if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
360 std::cout <<
"Fatal decoding stream: " << fd << std::endl;
364 auto hint = micromes.begin();
365 for (
int i = 0;
i < dqmstore_msg.histo_size();
i++) {
368 TObject *
obj =
nullptr;
369 const dqmstorepb::ROOTFilePB::Histo &
h = dqmstore_msg.histo(
i);
372 MicroME mme(
nullptr, path, objname, h.flags());
373 auto ir = micromes.insert(hint, mme);
374 if (ir->obj !=
nullptr) {
380 DEBUG(2,
"Merged MicroME " << mme.fullname() << std::endl);
383 DEBUG(2,
"Inserted MicroME " << mme.fullname() << std::endl);
398 TH1F obj_th1f(
"preload_th1f",
"preload_th1f", 2, 0, 1);
400 TBufferFile write_buffer(TBufferFile::kWrite);
401 write_buffer.WriteObject(&obj_th1f);
403 dqmstorepb::ROOTFilePB preload_file;
404 dqmstorepb::ROOTFilePB::Histo *hw = preload_file.add_histo();
405 hw->set_size(write_buffer.Length());
407 hw->set_streamed_histo((
const void *)write_buffer.Buffer(), write_buffer.Length());
410 const dqmstorepb::ROOTFilePB::Histo &hr = preload_file.histo(0);
413 TObject *
obj =
nullptr;
423 const int fork_total,
424 const std::vector<std::string> &filenames) {
425 DEBUG(1,
"Start process: " << fork_id <<
" parent: " << (fork_id / 2) << std::endl);
427 std::list<std::pair<int, int> > children;
430 for (
int i = 0;
i < 2; ++
i) {
431 int child_id = fork_id * 2 +
i;
432 if (child_id > fork_total)
438 int child_pid = ::fork();
439 if (child_pid == 0) {
440 ::prctl(PR_SET_PDEATHSIG, SIGKILL);
449 children.push_back(std::make_pair(fd[0], child_pid));
458 for (
unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
460 DEBUG(1,
"Adding file " << file << std::endl);
463 if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) {
464 std::cout <<
"Fatal Error opening file " << file << std::endl;
469 addFile(microme, filedescriptor);
470 ::close(filedescriptor);
474 for (
auto &chpair : children) {
475 int fd = chpair.first;
481 ::waitpid(chpair.second, &status, 0);
485 dqmstorepb::ROOTFilePB dqmstore_output_msg;
493 DEBUG(1,
"Writing file" << std::endl);
495 ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
506 std::cerr <<
"Usage: " << app_name <<
" [--[no-]debug] TASK OPTIONS\n\n " << app_name
507 <<
" [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n " << app_name
508 <<
" [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " << app_name
509 <<
" [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " << app_name <<
" [OPTIONS] dump [DAT FILE...]\n ";
518 std::vector<std::string> filenames;
521 filenames.reserve(argc);
523 for (arg = 1; arg <
argc; ++
arg) {
524 if (!strcmp(argv[arg],
"--no-debug"))
526 else if (!strcmp(argv[arg],
"--debug") || !strcmp(argv[arg],
"-d"))
533 if (!strcmp(argv[arg],
"add")) {
536 }
else if (!strcmp(argv[arg],
"dump")) {
539 }
else if (!strcmp(argv[arg],
"convert")) {
542 }
else if (!strcmp(argv[arg],
"encode")) {
546 std::cerr <<
"Unknown action: " << argv[
arg] << std::endl;
555 if ((arg != argc) && (strcmp(argv[arg],
"-j") == 0)) {
556 jobs = atoi(argv[arg + 1]);
558 if ((jobs < 1) || (jobs > 128)) {
559 std::cerr <<
"Invalid argument for -j\n";
569 std::cerr <<
"add|convert|encode actions requires a -o option to be set\n";
572 if (!strcmp(argv[arg],
"-o")) {
573 if (arg < argc - 1) {
574 output_file = argv[++
arg];
576 std::cerr <<
" -o option requires a value\n";
586 filenames.emplace_back(argv[arg]);
596 filenames.emplace_back(argv[arg]);
601 ret =
addFiles(output_file, filenames, jobs);
609 google::protobuf::ShutdownProtobufLibrary();
const std::string fullname() const
tuple ret
prodAgent to be discontinued
const std::string dirname
void processDirectory(TFile *file, const std::string &curdir, MEStore µmes)
static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj)
HitsTree SetDirectory(Hit_Tree)
TObject * extractNextObject(TBufferFile &buf)
bool operator<(const MicroME &rhs) const
void add(TObject *obj_to_add) const
static std::string const input
tuple key
prepare the HTCondor submission files and eventually submit them
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int nthreads
void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd)
int encodeFile(const std::string &output_filename, const std::vector< std::string > &filenames)
void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore µmes)
void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename)
const std::string objname
int convertFile(const std::string &output_filename, const std::vector< std::string > &filenames)
MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags=0)
std::set< MicroME > MEStore
int addFiles(const std::string &output_filename, const std::vector< std::string > &filenames, int nthreads)
int addFile(MEStore µmes, int fd)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector< std::string > &filenames)
int dumpFiles(const std::vector< std::string > &filenames)