80 #include <sys/types.h> 92 #include <google/protobuf/io/coded_stream.h> 93 #include <google/protobuf/io/gzip_stream.h> 94 #include <google/protobuf/io/zero_copy_stream_impl.h> 97 #include <TBufferFile.h> 99 #include <TObjString.h> 101 #include <TProfile.h> 105 #include <sys/prctl.h> 106 #include <sys/wait.h> 109 #define DEBUG(x, msg) \ 111 std::cout << "DEBUG: " << msg << std::flush 132 void add(TObject *obj_to_add)
const {
133 DEBUG(1,
"Merging: " <<
obj->GetName() <<
" << " << obj_to_add->GetName() << std::endl);
135 if (dynamic_cast<TH1 *>(
obj) &&
dynamic_cast<TH1 *
>(obj_to_add)) {
136 dynamic_cast<TH1 *
>(
obj)->
Add(dynamic_cast<TH1 *>(obj_to_add));
137 }
else if (dynamic_cast<TObjString *>(
obj) &&
dynamic_cast<TObjString *
>(obj_to_add)) {
139 DEBUG(1,
"Cannot merge (different types): " <<
obj->GetName() <<
" << " << obj_to_add->GetName() << std::endl);
152 using google::protobuf::io::ArrayInputStream;
153 using google::protobuf::io::CodedInputStream;
154 using google::protobuf::io::FileInputStream;
155 using google::protobuf::io::FileOutputStream;
156 using google::protobuf::io::GzipInputStream;
157 using google::protobuf::io::GzipOutputStream;
163 if (
buf.Length() ==
buf.BufferSize())
167 return reinterpret_cast<TObject *
>(
buf.ReadObjectAny(
nullptr));
171 size_t slash =
h.full_pathname().rfind(
'/');
172 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
173 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
174 dirname.assign(
h.full_pathname(), 0, dirpos);
175 objname.assign(
h.full_pathname(), namepos, std::string::npos);
176 TBufferFile
buf(TBufferFile::kRead,
h.size(), (
void *)
h.streamed_histo().data(), kFALSE);
180 std::cerr <<
"Error reading element: " <<
h.full_pathname() << std::endl;
185 FileOutputStream out_stream(out_fd);
187 options.format = GzipOutputStream::GZIP;
189 GzipOutputStream gzip_stream(&out_stream,
options);
190 dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
198 DEBUG(1,
"Writing file" << std::endl);
201 ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
208 auto mi = micromes.begin();
209 auto me = micromes.end();
211 DEBUG(1,
"Streaming ROOT objects" << std::endl);
212 for (; mi !=
me; ++mi) {
214 DEBUG(2,
"Streaming ROOT object " << mi->fullname() <<
"\n");
215 h->set_full_pathname(mi->fullname());
216 TBufferFile
buffer(TBufferFile::kWrite);
217 buffer.WriteObject(mi->obj);
219 h->set_flags(mi->flags);
220 h->set_streamed_histo((
const void *)
buffer.Buffer(),
buffer.Length());
226 DEBUG(1,
"Processing directory " << curdir <<
"\n");
227 file->cd(curdir.c_str());
229 TIter
next(gDirectory->GetListOfKeys());
230 while ((
key = (TKey *)
next())) {
231 TObject *
obj =
key->ReadObj();
232 if (dynamic_cast<TDirectory *>(
obj)) {
234 subdir.reserve(curdir.size() + strlen(
obj->GetName()) + 2);
240 }
else if ((dynamic_cast<TH1 *>(
obj)) || (dynamic_cast<TObjString *>(
obj))) {
241 if (dynamic_cast<TH1 *>(
obj)) {
242 dynamic_cast<TH1 *
>(
obj)->SetDirectory(
nullptr);
245 DEBUG(2, curdir <<
"/" <<
obj->GetName() <<
"\n");
248 micromes.insert(mme);
254 assert(filenames.size() == 1);
255 TFile
input(filenames[0].c_str());
256 DEBUG(0,
"Encoding file " << filenames[0] << std::endl);
268 assert(filenames.size() == 1);
269 TFile
output(output_filename.c_str(),
"RECREATE");
270 DEBUG(0,
"Converting file " << filenames[0] << std::endl);
273 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
274 FileInputStream
fin(filedescriptor);
276 CodedInputStream input_coded(&
input);
277 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
278 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
279 std::cout <<
"Fatal Error opening file " << filenames[0] << std::endl;
282 ::close(filedescriptor);
286 DEBUG(1,
h.full_pathname() << std::endl);
287 DEBUG(1,
h.size() << std::endl);
288 TBufferFile
buf(TBufferFile::kRead,
h.size(), (
void *)
h.streamed_histo().data(), kFALSE);
297 if (
end == std::string::npos)
301 if (!gDirectory->Get(
part.c_str()))
302 gDirectory->mkdir(
part.c_str());
303 gDirectory->cd(
part.c_str());
310 if (
end == std::string::npos)
314 DEBUG(1,
obj->GetName() << std::endl);
320 int dumpFiles(
const std::vector<std::string> &filenames) {
321 assert(!filenames.empty());
322 for (
int i = 0,
e = filenames.size();
i !=
e; ++
i) {
323 DEBUG(0,
"Dumping file " << filenames[
i] << std::endl);
326 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
327 FileInputStream
fin(filedescriptor);
329 CodedInputStream input_coded(&
input);
330 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
331 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
332 std::cout <<
"Fatal Error opening file " << filenames[0] << std::endl;
335 ::close(filedescriptor);
339 DEBUG(1,
h.full_pathname() << std::endl);
340 DEBUG(1,
h.size() << std::endl);
341 TBufferFile
buf(TBufferFile::kRead,
h.size(), (
void *)
h.streamed_histo().data(), kFALSE);
344 DEBUG(1,
obj->GetName() << std::endl);
345 DEBUG(1,
"Flags: " <<
h.flags() << std::endl);
355 FileInputStream
fin(
fd);
357 CodedInputStream input_coded(&
input);
358 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
359 if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
360 std::cout <<
"Fatal decoding stream: " <<
fd << std::endl;
364 auto hint = micromes.begin();
368 TObject *
obj =
nullptr;
373 auto ir = micromes.insert(hint, mme);
374 if (ir->obj !=
nullptr) {
380 DEBUG(2,
"Merged MicroME " << mme.fullname() << std::endl);
383 DEBUG(2,
"Inserted MicroME " << mme.fullname() << std::endl);
398 TH1F obj_th1f(
"preload_th1f",
"preload_th1f", 2, 0, 1);
400 TBufferFile write_buffer(TBufferFile::kWrite);
401 write_buffer.WriteObject(&obj_th1f);
405 hw->
set_size(write_buffer.Length());
413 TObject *
obj =
nullptr;
423 const int fork_total,
424 const std::vector<std::string> &filenames) {
425 DEBUG(1,
"Start process: " << fork_id <<
" parent: " << (fork_id / 2) << std::endl);
427 std::list<std::pair<int, int> >
children;
430 for (
int i = 0;
i < 2; ++
i) {
431 int child_id = fork_id * 2 +
i;
432 if (child_id > fork_total)
438 int child_pid = ::fork();
439 if (child_pid == 0) {
440 ::prctl(PR_SET_PDEATHSIG, SIGKILL);
449 children.push_back(std::make_pair(
fd[0], child_pid));
458 for (
unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
460 DEBUG(1,
"Adding file " <<
file << std::endl);
463 if ((filedescriptor = ::open(
file.c_str(), O_RDONLY)) == -1) {
464 std::cout <<
"Fatal Error opening file " <<
file << std::endl;
469 addFile(microme, filedescriptor);
470 ::close(filedescriptor);
475 int fd = chpair.first;
481 ::waitpid(chpair.second, &
status, 0);
493 DEBUG(1,
"Writing file" << std::endl);
495 ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
506 std::cerr <<
"Usage: " << app_name <<
" [--[no-]debug] TASK OPTIONS\n\n " << app_name
507 <<
" [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n " << app_name
508 <<
" [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " << app_name
509 <<
" [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " << app_name <<
" [OPTIONS] dump [DAT FILE...]\n ";
518 std::vector<std::string> filenames;
521 filenames.reserve(
argc);
524 if (!strcmp(
argv[
arg],
"--no-debug"))
526 else if (!strcmp(
argv[
arg],
"--debug") || !strcmp(
argv[
arg],
"-d"))
533 if (!strcmp(
argv[
arg],
"add")) {
536 }
else if (!strcmp(
argv[
arg],
"dump")) {
539 }
else if (!strcmp(
argv[
arg],
"convert")) {
542 }
else if (!strcmp(
argv[
arg],
"encode")) {
559 std::cerr <<
"Invalid argument for -j\n";
569 std::cerr <<
"add|convert|encode actions requires a -o option to be set\n";
572 if (!strcmp(
argv[
arg],
"-o")) {
576 std::cerr <<
" -o option requires a value\n";
586 filenames.emplace_back(
argv[
arg]);
596 filenames.emplace_back(
argv[
arg]);
609 google::protobuf::ShutdownProtobufLibrary();
bool operator<(const MicroME &rhs) const
void set_flags(uint32_t value)
const std::string dirname
void processDirectory(TFile *file, const std::string &curdir, MEStore µmes)
static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj)
TObject * extractNextObject(TBufferFile &buf)
ret
prodAgent to be discontinued
static std::string const input
void set_streamed_histo(ArgT0 &&arg0, ArgT... args)
void set_size(uint32_t value)
const std::string fullname() const
std::vector< std::shared_ptr< fireworks::OptionNode > > Options
key
prepare the HTCondor submission files and eventually submit them
def pipe(cmdline, input=None)
int main(int argc, char *argv[])
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int nthreads
void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd)
int encodeFile(const std::string &output_filename, const std::vector< std::string > &filenames)
void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore µmes)
const ::dqmstorepb::ROOTFilePB_Histo & histo(int index) const
void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename)
void add(TObject *obj_to_add) const
const std::string objname
int convertFile(const std::string &output_filename, const std::vector< std::string > &filenames)
::dqmstorepb::ROOTFilePB_Histo * add_histo()
MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags=0)
std::set< MicroME > MEStore
int addFiles(const std::string &output_filename, const std::vector< std::string > &filenames, int nthreads)
int addFile(MEStore µmes, int fd)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector< std::string > &filenames)
int dumpFiles(const std::vector< std::string > &filenames)