CMS 3D CMS Logo

fastHadd.cc
Go to the documentation of this file.
1 
80 #include <sys/types.h>
81 #include <sys/stat.h>
82 #include <fcntl.h>
83 #include <vector>
84 #include <set>
85 #include <string>
86 #include <iostream>
87 #include <memory>
88 #include <thread>
89 #include <mutex>
90 #include <list>
92 #include <google/protobuf/io/coded_stream.h>
93 #include <google/protobuf/io/gzip_stream.h>
94 #include <google/protobuf/io/zero_copy_stream_impl.h>
95 #include <TROOT.h>
96 #include <TFile.h>
97 #include <TBufferFile.h>
98 #include <TObject.h>
99 #include <TObjString.h>
100 #include <TH1.h>
101 #include <TProfile.h>
102 #include <TKey.h>
103 #include <TClass.h>
104 
105 #include <sys/prctl.h>
106 #include <sys/wait.h>
107 #include <csignal>
108 
109 #define DEBUG(x, msg) if (debug >= x) std::cout << "DEBUG: " << msg << std::flush
110 
111 int debug = 0;
112 
113 struct MicroME {
115  TObject *o,
116  const std::string& dir,
117  const std::string& obj,
118  uint32_t flags = 0)
119  : obj(o), dirname(dir), objname(obj), flags(flags) {}
120 
121  mutable TObject * obj;
122 
125 
126  uint32_t flags;
127 
128  bool operator<(const MicroME &rhs) const {
129  const MicroME &lhs = *this;
130  int diff = lhs.dirname.compare(rhs.dirname);
131  return (diff < 0 ? true
132  : diff == 0 ? lhs.objname < rhs.objname : false);
133  };
134 
135  void add(TObject *obj_to_add) const {
136  DEBUG(1, "Merging: " << obj->GetName() <<
137  " << " << obj_to_add->GetName() << std::endl);
138 
139  if (dynamic_cast<TH1 *>(obj) && dynamic_cast<TH1 *>(obj_to_add)) {
140  dynamic_cast<TH1 *>(obj)->Add(dynamic_cast<TH1 *>(obj_to_add));
141  } else if (dynamic_cast<TObjString *>(obj) && dynamic_cast<TObjString *>(obj_to_add)) {
142 
143 
144  } else {
145  DEBUG(1, "Cannot merge (different types): " << obj->GetName() <<
146  " << " << obj_to_add->GetName() << std::endl);
147  }
148  };
149 
150  const std::string fullname() const {
151  return dirname + '/' + objname;
152  };
153 
154 };
155 
156 using MEStore = std::set<MicroME>;
157 
158 enum TaskType {
163 };
164 
165 enum ErrType {
168 };
169 
170 using google::protobuf::io::FileInputStream;
171 using google::protobuf::io::FileOutputStream;
172 using google::protobuf::io::GzipInputStream;
173 using google::protobuf::io::GzipOutputStream;
174 using google::protobuf::io::CodedInputStream;
175 using google::protobuf::io::ArrayInputStream;
176 
177 
181 inline TObject * extractNextObject(TBufferFile &buf) {
182  if (buf.Length() == buf.BufferSize())
183  return nullptr;
184 
185  buf.InitMap();
186  return reinterpret_cast<TObject *>(buf.ReadObjectAny(nullptr));
187 }
188 
192  TObject ** obj) {
193 
194  size_t slash = h.full_pathname().rfind('/');
195  size_t dirpos = (slash == std::string::npos ? 0 : slash);
196  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
197  dirname.assign(h.full_pathname(), 0, dirpos);
198  objname.assign(h.full_pathname(), namepos, std::string::npos);
199  TBufferFile buf(TBufferFile::kRead, h.size(),
200  (void*)h.streamed_histo().data(),
201  kFALSE);
202  buf.Reset();
203  *obj = extractNextObject(buf);
204  if (!*obj) {
205  std::cerr << "Error reading element: " << h.full_pathname() << std::endl;
206  }
207 }
208 
209 void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd) {
210  FileOutputStream out_stream(out_fd);
212  options.format = GzipOutputStream::GZIP;
213  options.compression_level = 2;
214  GzipOutputStream gzip_stream(&out_stream,
215  options);
216  dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
217 
218  // make sure we flush before close
219  gzip_stream.Close();
220  out_stream.Close();
221 }
222 
223 void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg,
224  const std::string &output_filename) {
225 
226  DEBUG(1, "Writing file" << std::endl);
227 
228  int out_fd = ::open(output_filename.c_str(),
229  O_WRONLY | O_CREAT | O_TRUNC,
230  S_IRUSR | S_IWUSR |
231  S_IRGRP | S_IWGRP |
232  S_IROTH);
233 
234  writeMessageFD(dqmstore_output_msg, out_fd);
235  ::close(out_fd);
236 }
237 
238 
239 void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg,
240  const MEStore & micromes) {
241  auto mi = micromes.begin();
242  auto me = micromes.end();
243 
244  DEBUG(1, "Streaming ROOT objects" << std::endl);
245  for (; mi != me; ++mi) {
246  dqmstorepb::ROOTFilePB::Histo* h = dqmstore_output_msg.add_histo();
247  DEBUG(2, "Streaming ROOT object " << mi->fullname() << "\n");
248  h->set_full_pathname(mi->fullname());
249  TBufferFile buffer(TBufferFile::kWrite);
250  buffer.WriteObject(mi->obj);
251  h->set_size(buffer.Length());
252  h->set_flags(mi->flags);
253  h->set_streamed_histo((const void*)buffer.Buffer(),
254  buffer.Length());
255  delete mi->obj;
256  }
257 }
258 
259 
260 void processDirectory(TFile *file,
261  const std::string& curdir,
262  MEStore& micromes) {
263  DEBUG(1, "Processing directory " << curdir << "\n");
264  file->cd(curdir.c_str());
265  TKey *key;
266  TIter next (gDirectory->GetListOfKeys());
267  while ((key = (TKey *) next())) {
268  TObject * obj = key->ReadObj();
269  if (dynamic_cast<TDirectory *>(obj)) {
270  std::string subdir;
271  subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2);
272  subdir += curdir;
273  if (! curdir.empty())
274  subdir += '/';
275  subdir += obj->GetName();
276  processDirectory(file, subdir, micromes);
277  } else if ((dynamic_cast<TH1 *>(obj)) || (dynamic_cast<TObjString *>(obj))) {
278  if (dynamic_cast<TH1 *>(obj)) {
279  dynamic_cast<TH1 *>(obj)->SetDirectory(nullptr);
280  }
281 
282  DEBUG(2, curdir << "/" << obj->GetName() << "\n");
283  MicroME mme(obj, curdir, obj->GetName());
284 
285  micromes.insert(mme);
286  }
287  }
288 }
289 
290 
291 int encodeFile(const std::string &output_filename,
292  const std::vector<std::string> &filenames) {
293  assert(filenames.size() == 1);
294  TFile input(filenames[0].c_str());
295  DEBUG(0, "Encoding file " << filenames[0] << std::endl);
296  MEStore micromes;
297  dqmstorepb::ROOTFilePB dqmstore_message;
298 
299  processDirectory(&input, "", micromes);
300  fillMessage(dqmstore_message, micromes);
301  writeMessage(dqmstore_message, output_filename);
302 
303  return 0;
304 }
305 
306 int convertFile(const std::string &output_filename,
307  const std::vector<std::string> &filenames) {
308  assert(filenames.size() == 1);
309  TFile output(output_filename.c_str(), "RECREATE");
310  DEBUG(0, "Converting file " << filenames[0] << std::endl);
311  dqmstorepb::ROOTFilePB dqmstore_message;
312 
313  int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
314  FileInputStream fin(filedescriptor);
315  GzipInputStream input(&fin);
316  CodedInputStream input_coded(&input);
317  input_coded.SetTotalBytesLimit(1024*1024*1024, -1);
318  if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
319  std::cout << "Fatal Error opening file "
320  << filenames[0] << std::endl;
321  return ERR_NOFILE;
322  }
323  ::close(filedescriptor);
324 
325  for (int i = 0; i < dqmstore_message.histo_size(); i++) {
326  const dqmstorepb::ROOTFilePB::Histo& h = dqmstore_message.histo(i);
327  DEBUG(1, h.full_pathname() << std::endl);
328  DEBUG(1, h.size() << std::endl);
329  TBufferFile buf(TBufferFile::kRead, h.size(),
330  (void*)h.streamed_histo().data(),
331  kFALSE);
332  buf.Reset();
333  TObject *obj = extractNextObject(buf);
335  get_info(h, path, objname, &obj);
336  gDirectory->cd("/");
337  // Find the first path component.
338  size_t start = 0;
339  size_t end = path.find('/', start);
340  if (end == std::string::npos)
341  end = path.size();
342  while (true)
343  {
344  std::string part(path, start, end-start);
345  if (! gDirectory->Get(part.c_str()))
346  gDirectory->mkdir(part.c_str());
347  gDirectory->cd(part.c_str());
348  // Stop if we reached the end, ignoring any trailing '/'.
349  if (end+1 >= path.size())
350  break;
351  // Find the next path component.
352  start = end+1;
353  end = path.find('/', start);
354  if (end == std::string::npos)
355  end = path.size();
356  }
357  obj->Write();
358  DEBUG(1, obj->GetName() << std::endl);
359  }
360  output.Close();
361  return 0;
362 }
363 
364 int dumpFiles(const std::vector<std::string> &filenames) {
365  assert(!filenames.empty());
366  for (int i = 0, e = filenames.size(); i != e; ++i) {
367  DEBUG(0, "Dumping file " << filenames[i] << std::endl);
368  dqmstorepb::ROOTFilePB dqmstore_message;
369 
370  int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
371  FileInputStream fin(filedescriptor);
372  GzipInputStream input(&fin);
373  CodedInputStream input_coded(&input);
374  input_coded.SetTotalBytesLimit(1024*1024*1024, -1);
375  if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
376  std::cout << "Fatal Error opening file "
377  << filenames[0] << std::endl;
378  return ERR_NOFILE;
379  }
380  ::close(filedescriptor);
381 
382  for (int i = 0; i < dqmstore_message.histo_size(); i++) {
383  const dqmstorepb::ROOTFilePB::Histo& h = dqmstore_message.histo(i);
384  DEBUG(1, h.full_pathname() << std::endl);
385  DEBUG(1, h.size() << std::endl);
386  TBufferFile buf(TBufferFile::kRead, h.size(),
387  (void*)h.streamed_histo().data(),
388  kFALSE);
389  buf.Reset();
390  TObject *obj = extractNextObject(buf);
391  DEBUG(1, obj->GetName() << std::endl);
392  DEBUG(1, "Flags: " << h.flags() << std::endl);
393  }
394  }
395 
396  return 0;
397 }
398 
399 int addFile(MEStore& micromes, int fd) {
400  dqmstorepb::ROOTFilePB dqmstore_msg;
401 
402  FileInputStream fin(fd);
403  GzipInputStream input(&fin);
404  CodedInputStream input_coded(&input);
405  input_coded.SetTotalBytesLimit(1024*1024*1024, -1);
406  if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
407  std::cout << "Fatal decoding stream: "
408  << fd << std::endl;
409  return ERR_NOFILE;
410  }
411 
412  auto hint = micromes.begin();
413  for (int i = 0; i < dqmstore_msg.histo_size(); i++) {
416  TObject *obj = nullptr;
417  const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_msg.histo(i);
418  get_info(h, path, objname, &obj);
419 
420  MicroME mme(nullptr, path, objname, h.flags());
421  auto ir = micromes.insert(hint, mme);
422  if (ir->obj != nullptr) {
423  // new element was not added
424  // so we merge
425 
426  ir->add(obj);
427  delete obj;
428  DEBUG(2, "Merged MicroME " << mme.fullname() << std::endl);
429  } else {
430  ir->obj = obj;
431  DEBUG(2, "Inserted MicroME " << mme.fullname() << std::endl);
432  }
433 
434  hint = ir;
435  ++hint;
436  }
437 
438  return 0;
439 }
440 
441 // The idea is to preload root library (before forking).
442 // Which is significant for performance and especially memory usage,
443 // because root aakes a long time to init (and somehow manages to launch a subshell).
445  // write a single histogram
446  TH1F obj_th1f("preload_th1f", "preload_th1f", 2, 0, 1);
447 
448  TBufferFile write_buffer(TBufferFile::kWrite);
449  write_buffer.WriteObject(&obj_th1f);
450 
451  dqmstorepb::ROOTFilePB preload_file;
452  dqmstorepb::ROOTFilePB::Histo* hw = preload_file.add_histo();
453  hw->set_size(write_buffer.Length());
454  hw->set_flags(0);
455  hw->set_streamed_histo((const void*)write_buffer.Buffer(), write_buffer.Length());
456 
457  // now load this th1f
458  const dqmstorepb::ROOTFilePB::Histo &hr = preload_file.histo(0);
461  TObject *obj = nullptr;
462  get_info(hr, path, objname, &obj);
463  delete obj;
464 
465  // all done
466 }
467 
468 /* fork_id represents the position in a node (node number). */
469 void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector<std::string>& filenames) {
470  DEBUG(1, "Start process: " << fork_id << " parent: " << (fork_id / 2) << std::endl);
471 
472  std::list<std::pair<int, int> > children;
473 
474  // if this node has a subtree, start it
475  for (int i = 0; i < 2; ++i) {
476  int child_id = fork_id*2 + i;
477  if (child_id > fork_total)
478  continue;
479 
480  int fd[2];
481  ::pipe(fd);
482 
483  int child_pid = ::fork();
484  if (child_pid == 0) {
485  ::prctl(PR_SET_PDEATHSIG, SIGKILL);
486  ::close(fd[0]); // close read end
487 
488  addFilesWithFork(fd[1], child_id, fork_total, filenames);
489  ::close(fd[1]);
490 
491  ::_exit(0);
492  } else {
493  ::close(fd[1]); // close write end
494  children.push_back(std::make_pair(fd[0], child_pid));
495  }
496  }
497 
498  // merge all my files
499  MEStore microme;
500 
501  // select the filenames to process
502  // with threads=1, this just selects all the files
503  for (unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
504  const std::string& file = filenames[fi];
505  DEBUG(1, "Adding file " << file << std::endl);
506 
507  int filedescriptor;
508  if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) {
509  std::cout << "Fatal Error opening file "
510  << file << std::endl;
511 
512  exit(ERR_NOFILE);
513  }
514 
515  addFile(microme, filedescriptor);
516  ::close(filedescriptor);
517  }
518 
519  // merge all children
520  for (auto& chpair : children) {
521  int fd = chpair.first;
522  addFile(microme, fd);
523  ::close(fd);
524 
525  // collect the child, not necessary, but avoids <defunct>
526  int status;
527  ::waitpid(chpair.second, &status, 0);
528  }
529 
530  // output everything to fd
531  dqmstorepb::ROOTFilePB dqmstore_output_msg;
532  fillMessage(dqmstore_output_msg, microme);
533  writeMessageFD(dqmstore_output_msg, parent_fd);
534 };
535 
536 int addFiles(const std::string &output_filename,
537  const std::vector<std::string> &filenames,
538  int nthreads) {
539 
540  tryRootPreload();
541 
542  DEBUG(1, "Writing file" << std::endl);
543  int out_fd = ::open(output_filename.c_str(),
544  O_WRONLY | O_CREAT | O_TRUNC,
545  S_IRUSR | S_IWUSR |
546  S_IRGRP | S_IWGRP |
547  S_IROTH);
548 
549  addFilesWithFork(out_fd, 1, nthreads, filenames);
550  ::close(out_fd);
551 
552  return 0;
553 }
554 
555 static int
557 {
558  static const std::string app_name("fasthadd");
559 
560  std::cerr << "Usage: " << app_name
561  << " [--[no-]debug] TASK OPTIONS\n\n "
562  << app_name << " [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n "
563  << app_name << " [OPTIONS] convert -o ROOT_FILE DAT_FILE\n "
564  << app_name << " [OPTIONS] encode -o DAT_FILE ROOT_FILE\n "
565  << app_name << " [OPTIONS] dump [DAT FILE...]\n ";
566  return ERR_BADCFG;
567 }
568 
569 int main(int argc, char * argv[]) {
570  int arg;
571  int ret = 0;
572  int jobs = 1;
573  std::string output_file;
574  std::vector<std::string> filenames;
575  TaskType task;
576 
577  filenames.reserve(argc);
578 
579  for (arg = 1; arg < argc; ++arg) {
580  if (! strcmp(argv[arg], "--no-debug"))
581  debug = 0;
582  else if (! strcmp(argv[arg], "--debug")
583  || ! strcmp(argv[arg], "-d"))
584  debug++;
585  else
586  break;
587  }
588 
589  if (arg < argc) {
590  if (! strcmp(argv[arg], "add")) {
591  ++arg;
592  task = TASK_ADD;
593  } else if (! strcmp(argv[arg], "dump")) {
594  ++arg;
595  task = TASK_DUMP;
596  } else if (! strcmp(argv[arg], "convert")) {
597  ++arg;
598  task = TASK_CONVERT;
599  } else if (! strcmp(argv[arg], "encode")) {
600  ++arg;
601  task = TASK_ENCODE;
602  } else {
603  std::cerr << "Unknown action: " << argv[arg] << std::endl;
604  return showusage();
605  }
606  } else {
607  std::cerr << "Not enough arguments\n";
608  return showusage();
609  }
610 
611  if (task == TASK_ADD) {
612  if ((arg != argc) && (strcmp(argv[arg], "-j") == 0)) {
613  jobs = atoi(argv[arg+1]);
614 
615  if ((jobs < 1) || (jobs > 128)) {
616  std::cerr << "Invalid argument for -j\n";
617  return showusage();
618  };
619 
620  arg += 2;
621  }
622  }
623 
624  if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
625  if (arg == argc) {
626  std::cerr << "add|convert|encode actions requires a -o option to be set\n";
627  return showusage();
628  }
629  if (! strcmp(argv[arg], "-o")) {
630  if (arg < argc-1) {
631  output_file = argv[++arg];
632  } else {
633  std::cerr << " -o option requires a value\n";
634  return showusage();
635  }
636  }
637  } else if (task == TASK_DUMP) {
638  if (arg == argc) {
639  std::cerr << "Missing input file(s)\n";
640  return showusage();
641  }
642  for (; arg < argc; ++arg) {
643  filenames.emplace_back(argv[arg]);
644  }
645  }
646 
647  if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
648  if (++arg == argc) {
649  std::cerr << "Missing input file(s)\n";
650  return showusage();
651  }
652  for (; arg < argc; ++arg) {
653  filenames.emplace_back(argv[arg]);
654  }
655  }
656 
657  if (task == TASK_ADD)
658  ret = addFiles(output_file, filenames, jobs);
659  else if (task == TASK_DUMP)
660  ret = dumpFiles(filenames);
661  else if (task == TASK_CONVERT)
662  ret = convertFile(output_file, filenames);
663  else if (task == TASK_ENCODE)
664  ret = encodeFile(output_file, filenames);
665 
666 
667  google::protobuf::ShutdownProtobufLibrary();
668  return ret;
669 }
Definition: start.py:1
const std::string fullname() const
Definition: fastHadd.cc:150
::google::protobuf::uint32 size() const
const ::std::string & full_pathname() const
const std::string dirname
Definition: fastHadd.cc:123
void processDirectory(TFile *file, const std::string &curdir, MEStore &micromes)
Definition: fastHadd.cc:260
static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj)
Definition: fastHadd.cc:189
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:181
ErrType
Definition: fastHadd.cc:165
bool operator<(const MicroME &rhs) const
Definition: fastHadd.cc:128
TObject * obj
Definition: fastHadd.cc:121
void add(TObject *obj_to_add) const
Definition: fastHadd.cc:135
A arg
Definition: Factorize.h:37
void set_flags(::google::protobuf::uint32 value)
const ::std::string & streamed_histo() const
static std::string const input
Definition: EdmProvDump.cc:44
susybsm::HSCParticleRef hr
Definition: classes.h:26
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
#define end
Definition: vmac.h:39
def pipe(cmdline, input=None)
Definition: pipe.py:5
int main(int argc, char *argv[])
Definition: fastHadd.cc:569
void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd)
Definition: fastHadd.cc:209
int debug
Definition: fastHadd.cc:111
TaskType
Definition: fastHadd.cc:158
int encodeFile(const std::string &output_filename, const std::vector< std::string > &filenames)
Definition: fastHadd.cc:291
void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore &micromes)
Definition: fastHadd.cc:239
const ::dqmstorepb::ROOTFilePB_Histo & histo(int index) const
void set_size(::google::protobuf::uint32 value)
part
Definition: HCALResponse.h:20
void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename)
Definition: fastHadd.cc:223
const std::string objname
Definition: fastHadd.cc:124
void set_full_pathname(const ::std::string &value)
int convertFile(const std::string &output_filename, const std::vector< std::string > &filenames)
Definition: fastHadd.cc:306
::dqmstorepb::ROOTFilePB_Histo * add_histo()
void tryRootPreload()
Definition: fastHadd.cc:444
MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags=0)
Definition: fastHadd.cc:114
std::set< MicroME > MEStore
Definition: fastHadd.cc:156
uint32_t flags
Definition: fastHadd.cc:126
::google::protobuf::uint32 flags() const
int addFiles(const std::string &output_filename, const std::vector< std::string > &filenames, int nthreads)
Definition: fastHadd.cc:536
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:399
dbl *** dir
Definition: mlp_gen.cc:35
static int showusage()
Definition: fastHadd.cc:556
void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector< std::string > &filenames)
Definition: fastHadd.cc:469
int dumpFiles(const std::vector< std::string > &filenames)
Definition: fastHadd.cc:364
#define DEBUG(x, msg)
Definition: fastHadd.cc:109
void set_streamed_histo(const ::std::string &value)