80 #include <sys/types.h> 92 #include <google/protobuf/io/coded_stream.h> 93 #include <google/protobuf/io/gzip_stream.h> 94 #include <google/protobuf/io/zero_copy_stream_impl.h> 97 #include <TBufferFile.h> 99 #include <TObjString.h> 101 #include <TProfile.h> 105 #include <sys/prctl.h> 106 #include <sys/wait.h> 109 #define DEBUG(x, msg) if (debug >= x) std::cout << "DEBUG: " << msg << std::flush 131 return (diff < 0 ?
true 135 void add(TObject *obj_to_add)
const {
136 DEBUG(1,
"Merging: " << obj->GetName() <<
137 " << " << obj_to_add->GetName() << std::endl);
139 if (dynamic_cast<TH1 *>(obj) &&
dynamic_cast<TH1 *
>(obj_to_add)) {
140 dynamic_cast<TH1 *
>(
obj)->Add(dynamic_cast<TH1 *>(obj_to_add));
141 }
else if (dynamic_cast<TObjString *>(obj) &&
dynamic_cast<TObjString *
>(obj_to_add)) {
145 DEBUG(1,
"Cannot merge (different types): " << obj->GetName() <<
146 " << " << obj_to_add->GetName() << std::endl);
151 return dirname +
'/' +
objname;
170 using google::protobuf::io::FileInputStream;
171 using google::protobuf::io::FileOutputStream;
172 using google::protobuf::io::GzipInputStream;
173 using google::protobuf::io::GzipOutputStream;
174 using google::protobuf::io::CodedInputStream;
175 using google::protobuf::io::ArrayInputStream;
182 if (buf.Length() == buf.BufferSize())
186 return reinterpret_cast<TObject *
>(buf.ReadObjectAny(
nullptr));
195 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
196 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
198 objname.assign(h.
full_pathname(), namepos, std::string::npos);
199 TBufferFile buf(TBufferFile::kRead, h.
size(),
210 FileOutputStream out_stream(out_fd);
212 options.format = GzipOutputStream::GZIP;
213 options.compression_level = 2;
214 GzipOutputStream gzip_stream(&out_stream,
216 dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
226 DEBUG(1,
"Writing file" << std::endl);
228 int out_fd = ::open(output_filename.c_str(),
229 O_WRONLY | O_CREAT | O_TRUNC,
241 auto mi = micromes.begin();
242 auto me = micromes.end();
244 DEBUG(1,
"Streaming ROOT objects" << std::endl);
245 for (; mi != me; ++mi) {
247 DEBUG(2,
"Streaming ROOT object " << mi->fullname() <<
"\n");
249 TBufferFile
buffer(TBufferFile::kWrite);
250 buffer.WriteObject(mi->obj);
263 DEBUG(1,
"Processing directory " << curdir <<
"\n");
264 file->cd(curdir.c_str());
266 TIter
next (gDirectory->GetListOfKeys());
267 while ((key = (TKey *)
next())) {
268 TObject *
obj = key->ReadObj();
269 if (dynamic_cast<TDirectory *>(obj)) {
271 subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2);
273 if (! curdir.empty())
275 subdir += obj->GetName();
277 }
else if ((dynamic_cast<TH1 *>(obj)) || (dynamic_cast<TObjString *>(obj))) {
278 if (dynamic_cast<TH1 *>(obj)) {
279 dynamic_cast<TH1 *
>(
obj)->SetDirectory(
nullptr);
282 DEBUG(2, curdir <<
"/" << obj->GetName() <<
"\n");
283 MicroME mme(obj, curdir, obj->GetName());
285 micromes.insert(mme);
292 const std::vector<std::string> &
filenames) {
293 assert(filenames.size() == 1);
294 TFile
input(filenames[0].c_str());
295 DEBUG(0,
"Encoding file " << filenames[0] << std::endl);
307 const std::vector<std::string> &
filenames) {
308 assert(filenames.size() == 1);
309 TFile
output(output_filename.c_str(),
"RECREATE");
310 DEBUG(0,
"Converting file " << filenames[0] << std::endl);
313 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
314 FileInputStream
fin(filedescriptor);
315 GzipInputStream
input(&fin);
316 CodedInputStream input_coded(&input);
317 input_coded.SetTotalBytesLimit(1024*1024*1024, -1);
318 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
320 << filenames[0] << std::endl;
323 ::close(filedescriptor);
329 TBufferFile buf(TBufferFile::kRead, h.
size(),
339 size_t end = path.find(
'/', start);
340 if (end == std::string::npos)
345 if (! gDirectory->Get(part.c_str()))
346 gDirectory->mkdir(part.c_str());
347 gDirectory->cd(part.c_str());
349 if (end+1 >= path.size())
353 end = path.find(
'/', start);
354 if (end == std::string::npos)
358 DEBUG(1, obj->GetName() << std::endl);
365 assert(!filenames.empty());
366 for (
int i = 0,
e = filenames.size();
i !=
e; ++
i) {
367 DEBUG(0,
"Dumping file " << filenames[
i] << std::endl);
370 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
371 FileInputStream
fin(filedescriptor);
372 GzipInputStream
input(&fin);
373 CodedInputStream input_coded(&input);
374 input_coded.SetTotalBytesLimit(1024*1024*1024, -1);
375 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
377 << filenames[0] << std::endl;
380 ::close(filedescriptor);
386 TBufferFile buf(TBufferFile::kRead, h.
size(),
391 DEBUG(1, obj->GetName() << std::endl);
392 DEBUG(1,
"Flags: " << h.
flags() << std::endl);
402 FileInputStream
fin(fd);
403 GzipInputStream
input(&fin);
404 CodedInputStream input_coded(&input);
405 input_coded.SetTotalBytesLimit(1024*1024*1024, -1);
406 if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
412 auto hint = micromes.begin();
416 TObject *
obj =
nullptr;
421 auto ir = micromes.insert(hint, mme);
422 if (ir->obj !=
nullptr) {
428 DEBUG(2,
"Merged MicroME " << mme.fullname() << std::endl);
431 DEBUG(2,
"Inserted MicroME " << mme.fullname() << std::endl);
446 TH1F obj_th1f(
"preload_th1f",
"preload_th1f", 2, 0, 1);
448 TBufferFile write_buffer(TBufferFile::kWrite);
449 write_buffer.WriteObject(&obj_th1f);
453 hw->
set_size(write_buffer.Length());
461 TObject *
obj =
nullptr;
470 DEBUG(1,
"Start process: " << fork_id <<
" parent: " << (fork_id / 2) << std::endl);
472 std::list<std::pair<int, int> >
children;
475 for (
int i = 0;
i < 2; ++
i) {
476 int child_id = fork_id*2 +
i;
477 if (child_id > fork_total)
483 int child_pid = ::fork();
484 if (child_pid == 0) {
485 ::prctl(PR_SET_PDEATHSIG, SIGKILL);
494 children.push_back(std::make_pair(fd[0], child_pid));
503 for (
unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
505 DEBUG(1,
"Adding file " << file << std::endl);
508 if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) {
510 << file << std::endl;
515 addFile(microme, filedescriptor);
516 ::close(filedescriptor);
520 for (
auto& chpair : children) {
521 int fd = chpair.first;
527 ::waitpid(chpair.second, &status, 0);
537 const std::vector<std::string> &
filenames,
542 DEBUG(1,
"Writing file" << std::endl);
543 int out_fd = ::open(output_filename.c_str(),
544 O_WRONLY | O_CREAT | O_TRUNC,
561 <<
" [--[no-]debug] TASK OPTIONS\n\n " 562 << app_name <<
" [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n " 563 << app_name <<
" [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " 564 << app_name <<
" [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " 565 << app_name <<
" [OPTIONS] dump [DAT FILE...]\n ";
577 filenames.reserve(argc);
579 for (arg = 1; arg <
argc; ++
arg) {
580 if (! strcmp(argv[arg],
"--no-debug"))
582 else if (! strcmp(argv[arg],
"--debug")
583 || ! strcmp(argv[arg],
"-d"))
590 if (! strcmp(argv[arg],
"add")) {
593 }
else if (! strcmp(argv[arg],
"dump")) {
596 }
else if (! strcmp(argv[arg],
"convert")) {
599 }
else if (! strcmp(argv[arg],
"encode")) {
603 std::cerr <<
"Unknown action: " << argv[
arg] << std::endl;
612 if ((arg != argc) && (strcmp(argv[arg],
"-j") == 0)) {
613 jobs = atoi(argv[arg+1]);
615 if ((jobs < 1) || (jobs > 128)) {
616 std::cerr <<
"Invalid argument for -j\n";
626 std::cerr <<
"add|convert|encode actions requires a -o option to be set\n";
629 if (! strcmp(argv[arg],
"-o")) {
631 output_file = argv[++
arg];
633 std::cerr <<
" -o option requires a value\n";
643 filenames.emplace_back(argv[arg]);
653 filenames.emplace_back(argv[arg]);
658 ret =
addFiles(output_file, filenames, jobs);
667 google::protobuf::ShutdownProtobufLibrary();
const std::string fullname() const
::google::protobuf::uint32 size() const
const ::std::string & full_pathname() const
const std::string dirname
void processDirectory(TFile *file, const std::string &curdir, MEStore µmes)
static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj)
TObject * extractNextObject(TBufferFile &buf)
bool operator<(const MicroME &rhs) const
void add(TObject *obj_to_add) const
void set_flags(::google::protobuf::uint32 value)
const ::std::string & streamed_histo() const
static std::string const input
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
def pipe(cmdline, input=None)
int main(int argc, char *argv[])
void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd)
int encodeFile(const std::string &output_filename, const std::vector< std::string > &filenames)
void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore µmes)
const ::dqmstorepb::ROOTFilePB_Histo & histo(int index) const
void set_size(::google::protobuf::uint32 value)
void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename)
const std::string objname
void set_full_pathname(const ::std::string &value)
int convertFile(const std::string &output_filename, const std::vector< std::string > &filenames)
::dqmstorepb::ROOTFilePB_Histo * add_histo()
MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags=0)
std::set< MicroME > MEStore
::google::protobuf::uint32 flags() const
int addFiles(const std::string &output_filename, const std::vector< std::string > &filenames, int nthreads)
int addFile(MEStore µmes, int fd)
void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector< std::string > &filenames)
int dumpFiles(const std::vector< std::string > &filenames)
void set_streamed_histo(const ::std::string &value)