CMS 3D CMS Logo

fastHadd.cc
Go to the documentation of this file.
1 
80 #include <sys/types.h>
81 #include <sys/stat.h>
82 #include <fcntl.h>
83 #include <vector>
84 #include <set>
85 #include <string>
86 #include <iostream>
87 #include <memory>
88 #include <thread>
89 #include <mutex>
90 #include <list>
92 #include <google/protobuf/io/coded_stream.h>
93 #include <google/protobuf/io/gzip_stream.h>
94 #include <google/protobuf/io/zero_copy_stream_impl.h>
95 #include <TROOT.h>
96 #include <TFile.h>
97 #include <TBufferFile.h>
98 #include <TObject.h>
99 #include <TObjString.h>
100 #include <TH1.h>
101 #include <TProfile.h>
102 #include <TKey.h>
103 #include <TClass.h>
104 
105 #include <sys/prctl.h>
106 #include <sys/wait.h>
107 #include <csignal>
108 
109 #define DEBUG(x, msg) \
110  if (debug >= x) \
111  std::cout << "DEBUG: " << msg << std::flush
112 
113 int debug = 0;
114 
115 struct MicroME {
116  MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags = 0)
117  : obj(o), dirname(dir), objname(obj), flags(flags) {}
118 
119  mutable TObject *obj;
120 
123 
124  uint32_t flags;
125 
126  bool operator<(const MicroME &rhs) const {
127  const MicroME &lhs = *this;
128  int diff = lhs.dirname.compare(rhs.dirname);
129  return (diff < 0 ? true : diff == 0 ? lhs.objname < rhs.objname : false);
130  };
131 
132  void add(TObject *obj_to_add) const {
133  DEBUG(1, "Merging: " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl);
134 
135  if (dynamic_cast<TH1 *>(obj) && dynamic_cast<TH1 *>(obj_to_add)) {
136  dynamic_cast<TH1 *>(obj)->Add(dynamic_cast<TH1 *>(obj_to_add));
137  } else if (dynamic_cast<TObjString *>(obj) && dynamic_cast<TObjString *>(obj_to_add)) {
138  } else {
139  DEBUG(1, "Cannot merge (different types): " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl);
140  }
141  };
142 
143  const std::string fullname() const { return dirname + '/' + objname; };
144 };
145 
146 using MEStore = std::set<MicroME>;
147 
149 
151 
152 using google::protobuf::io::ArrayInputStream;
153 using google::protobuf::io::CodedInputStream;
154 using google::protobuf::io::FileInputStream;
155 using google::protobuf::io::FileOutputStream;
156 using google::protobuf::io::GzipInputStream;
157 using google::protobuf::io::GzipOutputStream;
158 
162 inline TObject *extractNextObject(TBufferFile &buf) {
163  if (buf.Length() == buf.BufferSize())
164  return nullptr;
165 
166  buf.InitMap();
167  return reinterpret_cast<TObject *>(buf.ReadObjectAny(nullptr));
168 }
169 
170 static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj) {
171  size_t slash = h.full_pathname().rfind('/');
172  size_t dirpos = (slash == std::string::npos ? 0 : slash);
173  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
174  dirname.assign(h.full_pathname(), 0, dirpos);
175  objname.assign(h.full_pathname(), namepos, std::string::npos);
176  TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
177  buf.Reset();
179  if (!*obj) {
180  std::cerr << "Error reading element: " << h.full_pathname() << std::endl;
181  }
182 }
183 
184 void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd) {
185  FileOutputStream out_stream(out_fd);
187  options.format = GzipOutputStream::GZIP;
188  options.compression_level = 2;
189  GzipOutputStream gzip_stream(&out_stream, options);
190  dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
191 
192  // make sure we flush before close
193  gzip_stream.Close();
194  out_stream.Close();
195 }
196 
197 void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename) {
198  DEBUG(1, "Writing file" << std::endl);
199 
200  int out_fd =
201  ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
202 
203  writeMessageFD(dqmstore_output_msg, out_fd);
204  ::close(out_fd);
205 }
206 
207 void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore &micromes) {
208  auto mi = micromes.begin();
209  auto me = micromes.end();
210 
211  DEBUG(1, "Streaming ROOT objects" << std::endl);
212  for (; mi != me; ++mi) {
213  dqmstorepb::ROOTFilePB::Histo *h = dqmstore_output_msg.add_histo();
214  DEBUG(2, "Streaming ROOT object " << mi->fullname() << "\n");
215  h->set_full_pathname(mi->fullname());
216  TBufferFile buffer(TBufferFile::kWrite);
217  buffer.WriteObject(mi->obj);
218  h->set_size(buffer.Length());
219  h->set_flags(mi->flags);
220  h->set_streamed_histo((const void *)buffer.Buffer(), buffer.Length());
221  delete mi->obj;
222  }
223 }
224 
225 void processDirectory(TFile *file, const std::string &curdir, MEStore &micromes) {
226  DEBUG(1, "Processing directory " << curdir << "\n");
227  file->cd(curdir.c_str());
228  TKey *key;
229  TIter next(gDirectory->GetListOfKeys());
230  while ((key = (TKey *)next())) {
231  TObject *obj = key->ReadObj();
232  if (dynamic_cast<TDirectory *>(obj)) {
234  subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2);
235  subdir += curdir;
236  if (!curdir.empty())
237  subdir += '/';
238  subdir += obj->GetName();
239  processDirectory(file, subdir, micromes);
240  } else if ((dynamic_cast<TH1 *>(obj)) || (dynamic_cast<TObjString *>(obj))) {
241  if (dynamic_cast<TH1 *>(obj)) {
242  dynamic_cast<TH1 *>(obj)->SetDirectory(nullptr);
243  }
244 
245  DEBUG(2, curdir << "/" << obj->GetName() << "\n");
246  MicroME mme(obj, curdir, obj->GetName());
247 
248  micromes.insert(mme);
249  }
250  }
251 }
252 
253 int encodeFile(const std::string &output_filename, const std::vector<std::string> &filenames) {
254  assert(filenames.size() == 1);
255  TFile input(filenames[0].c_str());
256  DEBUG(0, "Encoding file " << filenames[0] << std::endl);
257  MEStore micromes;
258  dqmstorepb::ROOTFilePB dqmstore_message;
259 
260  processDirectory(&input, "", micromes);
261  fillMessage(dqmstore_message, micromes);
262  writeMessage(dqmstore_message, output_filename);
263 
264  return 0;
265 }
266 
267 int convertFile(const std::string &output_filename, const std::vector<std::string> &filenames) {
268  assert(filenames.size() == 1);
269  TFile output(output_filename.c_str(), "RECREATE");
270  DEBUG(0, "Converting file " << filenames[0] << std::endl);
271  dqmstorepb::ROOTFilePB dqmstore_message;
272 
273  int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
274  FileInputStream fin(filedescriptor);
275  GzipInputStream input(&fin);
276  CodedInputStream input_coded(&input);
277  input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
278  if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
279  std::cout << "Fatal Error opening file " << filenames[0] << std::endl;
280  return ERR_NOFILE;
281  }
282  ::close(filedescriptor);
283 
284  for (int i = 0; i < dqmstore_message.histo_size(); i++) {
285  const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i);
286  DEBUG(1, h.full_pathname() << std::endl);
287  DEBUG(1, h.size() << std::endl);
288  TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
289  buf.Reset();
290  TObject *obj = extractNextObject(buf);
291  std::string path, objname;
292  get_info(h, path, objname, &obj);
293  gDirectory->cd("/");
294  // Find the first path component.
295  size_t start = 0;
296  size_t end = path.find('/', start);
297  if (end == std::string::npos)
298  end = path.size();
299  while (true) {
301  if (!gDirectory->Get(part.c_str()))
302  gDirectory->mkdir(part.c_str());
303  gDirectory->cd(part.c_str());
304  // Stop if we reached the end, ignoring any trailing '/'.
305  if (end + 1 >= path.size())
306  break;
307  // Find the next path component.
308  start = end + 1;
309  end = path.find('/', start);
310  if (end == std::string::npos)
311  end = path.size();
312  }
313  obj->Write();
314  DEBUG(1, obj->GetName() << std::endl);
315  }
316  output.Close();
317  return 0;
318 }
319 
320 int dumpFiles(const std::vector<std::string> &filenames) {
321  assert(!filenames.empty());
322  for (int i = 0, e = filenames.size(); i != e; ++i) {
323  DEBUG(0, "Dumping file " << filenames[i] << std::endl);
324  dqmstorepb::ROOTFilePB dqmstore_message;
325 
326  int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
327  FileInputStream fin(filedescriptor);
328  GzipInputStream input(&fin);
329  CodedInputStream input_coded(&input);
330  input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
331  if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
332  std::cout << "Fatal Error opening file " << filenames[0] << std::endl;
333  return ERR_NOFILE;
334  }
335  ::close(filedescriptor);
336 
337  for (int i = 0; i < dqmstore_message.histo_size(); i++) {
338  const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i);
339  DEBUG(1, h.full_pathname() << std::endl);
340  DEBUG(1, h.size() << std::endl);
341  TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
342  buf.Reset();
343  TObject *obj = extractNextObject(buf);
344  DEBUG(1, obj->GetName() << std::endl);
345  DEBUG(1, "Flags: " << h.flags() << std::endl);
346  }
347  }
348 
349  return 0;
350 }
351 
352 int addFile(MEStore &micromes, int fd) {
353  dqmstorepb::ROOTFilePB dqmstore_msg;
354 
355  FileInputStream fin(fd);
356  GzipInputStream input(&fin);
357  CodedInputStream input_coded(&input);
358  input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
359  if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
360  std::cout << "Fatal decoding stream: " << fd << std::endl;
361  return ERR_NOFILE;
362  }
363 
364  auto hint = micromes.begin();
365  for (int i = 0; i < dqmstore_msg.histo_size(); i++) {
367  std::string objname;
368  TObject *obj = nullptr;
369  const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_msg.histo(i);
370  get_info(h, path, objname, &obj);
371 
372  MicroME mme(nullptr, path, objname, h.flags());
373  auto ir = micromes.insert(hint, mme);
374  if (ir->obj != nullptr) {
375  // new element was not added
376  // so we merge
377 
378  ir->add(obj);
379  delete obj;
380  DEBUG(2, "Merged MicroME " << mme.fullname() << std::endl);
381  } else {
382  ir->obj = obj;
383  DEBUG(2, "Inserted MicroME " << mme.fullname() << std::endl);
384  }
385 
386  hint = ir;
387  ++hint;
388  }
389 
390  return 0;
391 }
392 
393 // The idea is to preload root library (before forking).
394 // Which is significant for performance and especially memory usage,
395 // because root aakes a long time to init (and somehow manages to launch a subshell).
397  // write a single histogram
398  TH1F obj_th1f("preload_th1f", "preload_th1f", 2, 0, 1);
399 
400  TBufferFile write_buffer(TBufferFile::kWrite);
401  write_buffer.WriteObject(&obj_th1f);
402 
403  dqmstorepb::ROOTFilePB preload_file;
404  dqmstorepb::ROOTFilePB::Histo *hw = preload_file.add_histo();
405  hw->set_size(write_buffer.Length());
406  hw->set_flags(0);
407  hw->set_streamed_histo((const void *)write_buffer.Buffer(), write_buffer.Length());
408 
409  // now load this th1f
410  const dqmstorepb::ROOTFilePB::Histo &hr = preload_file.histo(0);
412  std::string objname;
413  TObject *obj = nullptr;
414  get_info(hr, path, objname, &obj);
415  delete obj;
416 
417  // all done
418 }
419 
420 /* fork_id represents the position in a node (node number). */
421 void addFilesWithFork(int parent_fd,
422  const int fork_id,
423  const int fork_total,
424  const std::vector<std::string> &filenames) {
425  DEBUG(1, "Start process: " << fork_id << " parent: " << (fork_id / 2) << std::endl);
426 
427  std::list<std::pair<int, int> > children;
428 
429  // if this node has a subtree, start it
430  for (int i = 0; i < 2; ++i) {
431  int child_id = fork_id * 2 + i;
432  if (child_id > fork_total)
433  continue;
434 
435  int fd[2];
436  ::pipe(fd);
437 
438  int child_pid = ::fork();
439  if (child_pid == 0) {
440  ::prctl(PR_SET_PDEATHSIG, SIGKILL);
441  ::close(fd[0]); // close read end
442 
443  addFilesWithFork(fd[1], child_id, fork_total, filenames);
444  ::close(fd[1]);
445 
446  ::_exit(0);
447  } else {
448  ::close(fd[1]); // close write end
449  children.push_back(std::make_pair(fd[0], child_pid));
450  }
451  }
452 
453  // merge all my files
454  MEStore microme;
455 
456  // select the filenames to process
457  // with threads=1, this just selects all the files
458  for (unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
459  const std::string &file = filenames[fi];
460  DEBUG(1, "Adding file " << file << std::endl);
461 
462  int filedescriptor;
463  if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) {
464  std::cout << "Fatal Error opening file " << file << std::endl;
465 
466  exit(ERR_NOFILE);
467  }
468 
469  addFile(microme, filedescriptor);
470  ::close(filedescriptor);
471  }
472 
473  // merge all children
474  for (auto &chpair : children) {
475  int fd = chpair.first;
476  addFile(microme, fd);
477  ::close(fd);
478 
479  // collect the child, not necessary, but avoids <defunct>
480  int status;
481  ::waitpid(chpair.second, &status, 0);
482  }
483 
484  // output everything to fd
485  dqmstorepb::ROOTFilePB dqmstore_output_msg;
486  fillMessage(dqmstore_output_msg, microme);
487  writeMessageFD(dqmstore_output_msg, parent_fd);
488 };
489 
490 int addFiles(const std::string &output_filename, const std::vector<std::string> &filenames, int nthreads) {
491  tryRootPreload();
492 
493  DEBUG(1, "Writing file" << std::endl);
494  int out_fd =
495  ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
496 
497  addFilesWithFork(out_fd, 1, nthreads, filenames);
498  ::close(out_fd);
499 
500  return 0;
501 }
502 
503 static int showusage() {
504  static const std::string app_name("fasthadd");
505 
506  std::cerr << "Usage: " << app_name << " [--[no-]debug] TASK OPTIONS\n\n " << app_name
507  << " [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n " << app_name
508  << " [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " << app_name
509  << " [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " << app_name << " [OPTIONS] dump [DAT FILE...]\n ";
510  return ERR_BADCFG;
511 }
512 
513 int main(int argc, char *argv[]) {
514  int arg;
515  int ret = 0;
516  int jobs = 1;
518  std::vector<std::string> filenames;
519  TaskType task;
520 
521  filenames.reserve(argc);
522 
523  for (arg = 1; arg < argc; ++arg) {
524  if (!strcmp(argv[arg], "--no-debug"))
525  debug = 0;
526  else if (!strcmp(argv[arg], "--debug") || !strcmp(argv[arg], "-d"))
527  debug++;
528  else
529  break;
530  }
531 
532  if (arg < argc) {
533  if (!strcmp(argv[arg], "add")) {
534  ++arg;
535  task = TASK_ADD;
536  } else if (!strcmp(argv[arg], "dump")) {
537  ++arg;
538  task = TASK_DUMP;
539  } else if (!strcmp(argv[arg], "convert")) {
540  ++arg;
541  task = TASK_CONVERT;
542  } else if (!strcmp(argv[arg], "encode")) {
543  ++arg;
544  task = TASK_ENCODE;
545  } else {
546  std::cerr << "Unknown action: " << argv[arg] << std::endl;
547  return showusage();
548  }
549  } else {
550  std::cerr << "Not enough arguments\n";
551  return showusage();
552  }
553 
554  if (task == TASK_ADD) {
555  if ((arg != argc) && (strcmp(argv[arg], "-j") == 0)) {
556  jobs = atoi(argv[arg + 1]);
557 
558  if ((jobs < 1) || (jobs > 128)) {
559  std::cerr << "Invalid argument for -j\n";
560  return showusage();
561  };
562 
563  arg += 2;
564  }
565  }
566 
567  if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
568  if (arg == argc) {
569  std::cerr << "add|convert|encode actions requires a -o option to be set\n";
570  return showusage();
571  }
572  if (!strcmp(argv[arg], "-o")) {
573  if (arg < argc - 1) {
574  output_file = argv[++arg];
575  } else {
576  std::cerr << " -o option requires a value\n";
577  return showusage();
578  }
579  }
580  } else if (task == TASK_DUMP) {
581  if (arg == argc) {
582  std::cerr << "Missing input file(s)\n";
583  return showusage();
584  }
585  for (; arg < argc; ++arg) {
586  filenames.emplace_back(argv[arg]);
587  }
588  }
589 
590  if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
591  if (++arg == argc) {
592  std::cerr << "Missing input file(s)\n";
593  return showusage();
594  }
595  for (; arg < argc; ++arg) {
596  filenames.emplace_back(argv[arg]);
597  }
598  }
599 
600  if (task == TASK_ADD)
601  ret = addFiles(output_file, filenames, jobs);
602  else if (task == TASK_DUMP)
603  ret = dumpFiles(filenames);
604  else if (task == TASK_CONVERT)
605  ret = convertFile(output_file, filenames);
606  else if (task == TASK_ENCODE)
607  ret = encodeFile(output_file, filenames);
608 
609  google::protobuf::ShutdownProtobufLibrary();
610  return ret;
611 }
Definition: start.py:1
bool operator<(const MicroME &rhs) const
Definition: fastHadd.cc:126
void set_flags(uint32_t value)
const std::string dirname
Definition: fastHadd.cc:121
void processDirectory(TFile *file, const std::string &curdir, MEStore &micromes)
Definition: fastHadd.cc:225
static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj)
Definition: fastHadd.cc:170
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:162
ret
prodAgent to be discontinued
ErrType
Definition: fastHadd.cc:150
TObject * obj
Definition: fastHadd.cc:119
assert(be >=bs)
A arg
Definition: Factorize.h:31
static std::string const input
Definition: EdmProvDump.cc:50
void set_streamed_histo(ArgT0 &&arg0, ArgT... args)
void set_size(uint32_t value)
const std::string fullname() const
Definition: fastHadd.cc:143
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
def pipe(cmdline, input=None)
Definition: pipe.py:5
int main(int argc, char *argv[])
Definition: fastHadd.cc:513
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int nthreads
void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd)
Definition: fastHadd.cc:184
int debug
Definition: fastHadd.cc:113
TaskType
Definition: fastHadd.cc:148
int encodeFile(const std::string &output_filename, const std::vector< std::string > &filenames)
Definition: fastHadd.cc:253
void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore &micromes)
Definition: fastHadd.cc:207
const ::dqmstorepb::ROOTFilePB_Histo & histo(int index) const
part
Definition: HCALResponse.h:20
void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename)
Definition: fastHadd.cc:197
void add(TObject *obj_to_add) const
Definition: fastHadd.cc:132
const std::string objname
Definition: fastHadd.cc:122
int convertFile(const std::string &output_filename, const std::vector< std::string > &filenames)
Definition: fastHadd.cc:267
::dqmstorepb::ROOTFilePB_Histo * add_histo()
void tryRootPreload()
Definition: fastHadd.cc:396
MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags=0)
Definition: fastHadd.cc:116
std::set< MicroME > MEStore
Definition: fastHadd.cc:146
uint32_t flags
Definition: fastHadd.cc:124
int addFiles(const std::string &output_filename, const std::vector< std::string > &filenames, int nthreads)
Definition: fastHadd.cc:490
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
Definition: output.py:1
static int showusage()
Definition: fastHadd.cc:503
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector< std::string > &filenames)
Definition: fastHadd.cc:421
fd
Definition: ztee.py:136
int dumpFiles(const std::vector< std::string > &filenames)
Definition: fastHadd.cc:320
#define DEBUG(x, msg)
Definition: fastHadd.cc:109
def exit(msg="")