CMS 3D CMS Logo

ExternalLHEProducer.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: ExternalLHEProducer
4 // Class: ExternalLHEProducer
5 //
13 //
14 // Original Author: Brian Paul Bockelman,8 R-018,+41227670861,
15 // Created: Fri Oct 21 11:37:26 CEST 2011
16 //
17 //
18 
19 
20 // system include files
21 #include <cstdio>
22 #include <memory>
23 #include <vector>
24 #include <string>
25 #include <fstream>
26 #include "boost/filesystem.hpp"
27 #include <unistd.h>
28 #include <dirent.h>
29 #include <fcntl.h>
30 #include <sys/wait.h>
31 #include <sys/time.h>
32 #include <sys/resource.h>
33 #include "tbb/task_arena.h"
34 
35 
36 #include "boost/bind.hpp"
37 #include "boost/shared_ptr.hpp"
38 #include "boost/ptr_container/ptr_deque.hpp"
39 
40 // user include files
44 
48 
50 
53 
58 
62 
65 
67 
68 //
69 // class declaration
70 //
71 
72 class ExternalLHEProducer : public edm::one::EDProducer<edm::BeginRunProducer,
73  edm::EndRunProducer> {
74 public:
75  explicit ExternalLHEProducer(const edm::ParameterSet& iConfig);
76 
77  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
78 
79 private:
80 
81  void produce(edm::Event&, const edm::EventSetup&) override;
82  void beginRunProduce(edm::Run& run, edm::EventSetup const& es) override;
83  void endRunProduce(edm::Run&, edm::EventSetup const&) override;
84  void preallocThreads(unsigned int) override;
85 
86  std::vector<std::string> makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const;
87  int closeDescriptors(int preserve) const;
88  void executeScript(std::vector<std::string> const& args, int id) const;
89 
90  void nextEvent();
91 
92  // ----------member data ---------------------------
95  const std::vector<std::string> args_;
96  uint32_t npars_;
97  uint32_t nEvents_;
98  bool storeXML_;
99  unsigned int nThreads_{1};
102 
103  // Used only if nPartonMapping is in the configuration
104  std::map<unsigned, std::pair<unsigned, unsigned>> nPartonMapping_{};
105 
106  std::unique_ptr<lhef::LHEReader> reader_;
107  std::shared_ptr<lhef::LHERunInfo> runInfoLast_;
108  std::shared_ptr<lhef::LHERunInfo> runInfo_;
109  std::shared_ptr<lhef::LHEEvent> partonLevel_;
110  boost::ptr_deque<LHERunInfoProduct> runInfoProducts_;
111  bool wasMerged;
112 
113  class FileCloseSentry : private boost::noncopyable {
114  public:
115  explicit FileCloseSentry(int fd) : fd_(fd) {};
116 
118  close(fd_);
119  }
120  private:
121  int fd_;
122  };
123 
124 };
125 
126 //
127 // constructors and destructor
128 //
130  scriptName_((iConfig.getParameter<edm::FileInPath>("scriptName")).fullPath()),
131  outputFile_(iConfig.getParameter<std::string>("outputFile")),
132  args_(iConfig.getParameter<std::vector<std::string> >("args")),
133  npars_(iConfig.getParameter<uint32_t>("numberOfParameters")),
134  nEvents_(iConfig.getUntrackedParameter<uint32_t>("nEvents")),
135  storeXML_(iConfig.getUntrackedParameter<bool>("storeXML")),
136  generateConcurrently_(iConfig.getUntrackedParameter<bool>("generateConcurrently"))
137 {
138  if (npars_ != args_.size())
139  throw cms::Exception("ExternalLHEProducer") << "Problem with configuration: " << args_.size() << " script arguments given, expected " << npars_;
140 
141  if (iConfig.exists("nPartonMapping")) {
142  auto& processMap(iConfig.getParameterSetVector("nPartonMapping"));
143  for (auto& cfg : processMap) {
144  unsigned processId(cfg.getParameter<unsigned>("idprup"));
145 
146  auto orderStr(cfg.getParameter<std::string>("order"));
147  unsigned order(0);
148  if (orderStr == "LO")
149  order = 0;
150  else if (orderStr == "NLO")
151  order = 1;
152  else
153  throw cms::Exception("ExternalLHEProducer") << "Invalid order specification for process " << processId << ": " << orderStr;
154 
155  unsigned np(cfg.getParameter<unsigned>("np"));
156 
157  nPartonMapping_.emplace(processId, std::make_pair(order, np));
158  }
159  }
160 
161  produces<LHEXMLStringProduct, edm::Transition::BeginRun>("LHEScriptOutput");
162 
163  produces<LHEEventProduct>();
164  produces<LHERunInfoProduct, edm::Transition::BeginRun>();
165  produces<LHERunInfoProduct, edm::Transition::EndRun>();
166 }
167 
168 
169 //
170 // member functions
171 //
172 
173 // ------------ method called with number of threads in job --
174 void
176 {
177  nThreads_ = iThreads;
178 }
179 
180 // ------------ method called to produce the data ------------
181 void
183 {
184  nextEvent();
185  if (!partonLevel_) {
186  throw edm::Exception(edm::errors::EventGenerationFailure) << "No lhe event found in ExternalLHEProducer::produce(). "
187  << "The likely cause is that the lhe file contains fewer events than were requested, which is possible "
188  << "in case of phase space integration or uneweighting efficiency problems.";
189  }
190 
191  std::unique_ptr<LHEEventProduct> product(
192  new LHEEventProduct(*partonLevel_->getHEPEUP(),
193  partonLevel_->originalXWGTUP())
194  );
195  if (partonLevel_->getPDF()) {
196  product->setPDF(*partonLevel_->getPDF());
197  }
198  std::for_each(partonLevel_->weights().begin(),
199  partonLevel_->weights().end(),
200  boost::bind(&LHEEventProduct::addWeight,
201  product.get(), _1));
202  product->setScales(partonLevel_->scales());
203  if (nPartonMapping_.empty()) {
204  product->setNpLO(partonLevel_->npLO());
205  product->setNpNLO(partonLevel_->npNLO());
206  }
207  else {
208  // overwrite npLO and npNLO values by user-specified mapping
209  unsigned processId(partonLevel_->getHEPEUP()->IDPRUP);
210  unsigned order(0);
211  unsigned np(0);
212  try {
213  auto procDef(nPartonMapping_.at(processId));
214  order = procDef.first;
215  np = procDef.second;
216  }
217  catch (std::out_of_range&) {
218  throw cms::Exception("ExternalLHEProducer") << "Unexpected IDPRUP encountered: " << partonLevel_->getHEPEUP()->IDPRUP;
219  }
220 
221  switch (order) {
222  case 0:
223  product->setNpLO(np);
224  product->setNpNLO(-1);
225  break;
226  case 1:
227  product->setNpLO(-1);
228  product->setNpNLO(np);
229  break;
230  default:
231  break;
232  }
233  }
234 
235  std::for_each(partonLevel_->getComments().begin(),
236  partonLevel_->getComments().end(),
237  boost::bind(&LHEEventProduct::addComment,
238  product.get(), _1));
239 
240  iEvent.put(std::move(product));
241 
242  if (runInfo_) {
243  std::unique_ptr<LHERunInfoProduct> product(new LHERunInfoProduct(*runInfo_->getHEPRUP()));
244  std::for_each(runInfo_->getHeaders().begin(),
245  runInfo_->getHeaders().end(),
246  boost::bind(&LHERunInfoProduct::addHeader,
247  product.get(), _1));
248  std::for_each(runInfo_->getComments().begin(),
249  runInfo_->getComments().end(),
250  boost::bind(&LHERunInfoProduct::addComment,
251  product.get(), _1));
252 
253  if (!runInfoProducts_.empty()) {
254  runInfoProducts_.front().mergeProduct(*product);
255  if (!wasMerged) {
256  runInfoProducts_.pop_front();
257  runInfoProducts_.push_front(product.release());
258  wasMerged = true;
259  }
260  }
261 
262  runInfo_.reset();
263  }
264 
265  partonLevel_.reset();
266  return;
267 }
268 
269 // ------------ method called when starting to processes a run ------------
270 void
272 {
273 
274  // pass the number of events as previous to last argument
275 
276  // pass the random number generator seed as last argument
277 
279 
280  if ( ! rng.isAvailable()) {
281  throw cms::Exception("Configuration")
282  << "The ExternalLHEProducer module requires the RandomNumberGeneratorService\n"
283  "which is not present in the configuration file. You must add the service\n"
284  "in the configuration file if you want to run ExternalLHEProducer";
285  }
286 
287  std::vector<std::string> infiles;
288  auto const seed = rng->mySeed();
289  if (generateConcurrently_) {
290  infiles.resize(nThreads_);
291  auto const nEventsAve = nEvents_ / nThreads_;
292  unsigned int const overflow = nThreads_ - (nEvents_ % nThreads_);
293  std::exception_ptr except;
294  std::atomic<char> exceptSet{0};
295 
296  tbb::this_task_arena::isolate([this, &except, &infiles, &exceptSet, nEventsAve, overflow, seed]() {
297  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
298  waitTask->set_ref_count(1 + nThreads_);
299 
300  for (unsigned int t = 0; t < nThreads_; ++t) {
301  uint32_t nEvents = nEventsAve;
302  if (nEvents_ % nThreads_ != 0 and t >= overflow) {
303  nEvents += 1;
304  }
305  auto task = edm::make_functor_task(tbb::task::allocate_root(),
306  [t, this, &infiles, seed, nEvents, &except, &exceptSet, waitTask]() {
307  try {
308  using namespace boost::filesystem;
309  using namespace std::string_literals;
310  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
311  infiles[t] = out.native();
312  executeScript(makeArgs(nEvents, 1, seed + t), t);
313  } catch (...) {
314  char expected = 0;
315  if (exceptSet.compare_exchange_strong(expected, 1)) {
316  except = std::current_exception();
317  exceptSet.store(2);
318  }
319  }
320  waitTask->decrement_ref_count();
321  });
322  tbb::task::spawn(*task);
323  }
324  waitTask->wait_for_all();
325  tbb::task::destroy(*waitTask);
326  });
327  if (exceptSet != 0) {
328  std::rethrow_exception(except);
329  }
330  } else {
331  infiles = std::vector<std::string>(1, outputFile_);
333  }
334 
335  //fill LHEXMLProduct (streaming read directly into compressed buffer to save memory)
336  std::unique_ptr<LHEXMLStringProduct> p(new LHEXMLStringProduct);
337 
338  //store the XML file only if explictly requested
339  if (storeXML_) {
341  if (generateConcurrently_) {
342  using namespace boost::filesystem;
343  file = (path("thread0") / path(outputFile_)).native();
344  } else {
345  file = outputFile_;
346  }
347  std::ifstream instream(file);
348  if (!instream) {
349  throw cms::Exception("OutputOpenError") << "Unable to open script output file " << outputFile_ << ".";
350  }
351  instream.seekg (0, instream.end);
352  int insize = instream.tellg();
353  instream.seekg (0, instream.beg);
354  p->fillCompressedContent(instream, 0.25*insize);
355  instream.close();
356  }
357  run.put(std::move(p), "LHEScriptOutput");
358 
359  // LHE C++ classes translation
360  // (read back uncompressed file from disk in streaming mode again to save memory)
361 
362  unsigned int skip = 0;
363  reader_ = std::make_unique<lhef::LHEReader>(infiles, skip);
364 
365  nextEvent();
366  if (runInfoLast_) {
368 
369  std::unique_ptr<LHERunInfoProduct> product(new LHERunInfoProduct(*runInfo_->getHEPRUP()));
370  std::for_each(runInfo_->getHeaders().begin(),
371  runInfo_->getHeaders().end(),
372  boost::bind(&LHERunInfoProduct::addHeader,
373  product.get(), _1));
374  std::for_each(runInfo_->getComments().begin(),
375  runInfo_->getComments().end(),
376  boost::bind(&LHERunInfoProduct::addComment,
377  product.get(), _1));
378 
379  // keep a copy around in case of merging
380  runInfoProducts_.push_back(new LHERunInfoProduct(*product));
381  wasMerged = false;
382 
383  run.put(std::move(product));
384 
385  runInfo_.reset();
386  }
387 
388 }
389 
390 // ------------ method called when ending the processing of a run ------------
391 void
393 {
394 
395  if (!runInfoProducts_.empty()) {
396  std::unique_ptr<LHERunInfoProduct> product(runInfoProducts_.pop_front().release());
397  run.put(std::move(product));
398  }
399 
400  nextEvent();
401  if (partonLevel_) {
402  throw edm::Exception(edm::errors::EventGenerationFailure) << "Error in ExternalLHEProducer::endRunProduce(). "
403  << "Event loop is over, but there are still lhe events to process."
404  << "This could happen if lhe file contains more events than requested. This is never expected to happen.";
405  }
406 
407  reader_.reset();
408  if (generateConcurrently_) {
409  for (unsigned int t = 0; t < nThreads_; ++t) {
410  using namespace boost::filesystem;
411  using namespace std::string_literals;
412  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
413  if (unlink(out.c_str())) {
414  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << out
415  << " (errno=" << errno << ", " << strerror(errno) << ").";
416  }
417  }
418  } else {
419  if (unlink(outputFile_.c_str())) {
420  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << outputFile_
421  << " (errno=" << errno << ", " << strerror(errno) << ").";
422  }
423  }
424 }
425 
426 std::vector<std::string> ExternalLHEProducer::makeArgs(uint32_t nEvents,
427  unsigned int nThreads,
428  std::uint32_t seed) const {
429  std::vector<std::string> args;
430  args.reserve(3 + args_.size());
431 
432  args.push_back(args_.front());
433  args.push_back(std::to_string(nEvents));
434 
435  args.push_back(std::to_string(seed));
436 
437  args.push_back(std::to_string(nThreads));
438  std::copy(args_.begin() + 1, args_.end(), std::back_inserter(args));
439 
440  for (unsigned int iArg = 0; iArg < args.size(); iArg++) {
441  LogDebug("LHEInputArgs") << "arg [" << iArg << "] = " << args[iArg];
442  }
443 
444  return args;
445 }
446 
447 // ------------ Close all the open file descriptors ------------
448 int
450 {
451  int maxfd = 1024;
452  int fd;
453 #ifdef __linux__
454  DIR * dir;
455  struct dirent *dp;
456  maxfd = preserve;
457  if ((dir = opendir("/proc/self/fd"))) {
458  errno = 0;
459  while ((dp = readdir (dir)) != nullptr) {
460  if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) {
461  continue;
462  }
463  if (sscanf(dp->d_name, "%d", &fd) != 1) {
464  //throw cms::Exception("closeDescriptors") << "Found unexpected filename in /proc/self/fd: " << dp->d_name;
465  return -1;
466  }
467  if (fd > maxfd) {
468  maxfd = fd;
469  }
470  }
471  if (errno) {
472  //throw cms::Exception("closeDescriptors") << "Unable to determine the number of fd (errno=" << errno << ", " << strerror(errno) << ").";
473  return errno;
474  }
475  closedir(dir);
476  }
477 #endif
478  // TODO: assert for an unreasonable number of fds?
479  for (fd=3; fd<maxfd+1; fd++) {
480  if (fd != preserve)
481  close(fd);
482  }
483  return 0;
484 }
485 
486 // ------------ Execute the script associated with this producer ------------
487 void
488 ExternalLHEProducer::executeScript(std::vector<std::string> const& args, int id) const
489 {
490 
491  // Fork a script, wait until it finishes.
492 
493  int rc = 0, rc2 = 0;
494  int filedes[2], fd_flags;
495 
496  if (pipe(filedes)) {
497  throw cms::Exception("Unable to create a new pipe");
498  }
499  FileCloseSentry sentry1(filedes[0]), sentry2(filedes[1]);
500 
501  if ((fd_flags = fcntl(filedes[1], F_GETFD, NULL)) == -1) {
502  throw cms::Exception("ExternalLHEProducer") << "Failed to get pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
503  }
504  if (fcntl(filedes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
505  throw cms::Exception("ExternalLHEProducer") << "Failed to set pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
506  }
507 
508  unsigned int argc = 1 + args.size();
509  // TODO: assert that we have a reasonable number of arguments
510  char **argv = new char *[argc+1];
511  argv[0] = strdup(scriptName_.c_str());
512  for (unsigned int i=1; i<argc; i++) {
513  argv[i] = strdup(args[i-1].c_str());
514  }
515  argv[argc] = nullptr;
516 
517  pid_t pid = fork();
518  if (pid == 0) {
519  // The child process
520  if (!(rc = closeDescriptors(filedes[1]))) {
521  if (generateConcurrently_) {
522  using namespace boost::filesystem;
523  using namespace std::string_literals;
524  boost::system::error_code ec;
525  auto newDir = path("thread"s + std::to_string(id));
526  create_directory(newDir, ec);
527  current_path(newDir, ec);
528  }
529  execvp(argv[0], argv); // If execv returns, we have an error.
530  rc = errno;
531  }
532  while ((write(filedes[1], &rc, sizeof(int)) == -1) && (errno == EINTR)) {}
533  _exit(1);
534  }
535 
536  // Free the arg vector ASAP
537  for (unsigned int i=0; i<args.size()+1; i++) {
538  free(argv[i]);
539  }
540  delete [] argv;
541 
542  if (pid == -1) {
543  throw cms::Exception("ForkException") << "Unable to fork a child (errno=" << errno << ", " << strerror(errno) << ")";
544  }
545 
546  close(filedes[1]);
547  // If the exec succeeds, the read will fail.
548  while (((rc2 = read(filedes[0], &rc, sizeof(int))) == -1) && (errno == EINTR)) { rc2 = 0; }
549  if ((rc2 == sizeof(int)) && rc) {
550  throw cms::Exception("ExternalLHEProducer") << "Failed to execute script (errno=" << rc << ", " << strerror(rc) << ")";
551  }
552  close(filedes[0]);
553 
554  int status = 0;
555  errno = 0;
556  do {
557  if (waitpid(pid, &status, 0) < 0) {
558  if (errno == EINTR) {
559  continue;
560  } else {
561  throw cms::Exception("ExternalLHEProducer") << "Failed to read child status (errno=" << errno << ", " << strerror(errno) << ")";
562  }
563  }
564  if (WIFSIGNALED(status)) {
565  throw cms::Exception("ExternalLHEProducer") << "Child exited due to signal " << WTERMSIG(status) << ".";
566  }
567  if (WIFEXITED(status)) {
568  rc = WEXITSTATUS(status);
569  break;
570  }
571  } while (true);
572  if (rc) {
573  throw cms::Exception("ExternalLHEProducer") << "Child failed with exit code " << rc << ".";
574  }
575 
576 }
577 
578 // ------------ method fills 'descriptions' with the allowed parameters for the module ------------
579 void
581  //The following says we do not know what parameters are allowed so do no validation
582  // Please change this to state exactly what you do use, even if it is no parameters
584  desc.setComment("Executes an external script and places its output file into an EDM collection");
585 
586  edm::FileInPath thePath;
587  desc.add<edm::FileInPath>("scriptName", thePath);
588  desc.add<std::string>("outputFile", "myoutput");
589  desc.add<std::vector<std::string> >("args");
590  desc.add<uint32_t>("numberOfParameters");
591  desc.addUntracked<uint32_t>("nEvents");
592  desc.addUntracked<bool>("storeXML", false);
593  desc.addUntracked<bool>("generateConcurrently", false)
594  ->setComment("If true, run the script concurrently in separate processes.");
595 
596  edm::ParameterSetDescription nPartonMappingDesc;
597  nPartonMappingDesc.add<unsigned>("idprup");
598  nPartonMappingDesc.add<std::string>("order");
599  nPartonMappingDesc.add<unsigned>("np");
600  desc.addVPSetOptional("nPartonMapping", nPartonMappingDesc);
601 
602  descriptions.addDefault(desc);
603 }
604 
606 {
607 
608  if (partonLevel_)
609  return;
610 
611  if(not reader_) { return;}
612 
613  partonLevel_ = reader_->next();
614  if (!partonLevel_) {
615  //see if we have another file to read;
616  bool newFileOpened;
617  do {
618  newFileOpened = false;
619  partonLevel_ = reader_->next(&newFileOpened);
620  } while (newFileOpened && !partonLevel_);
621  }
622  if (!partonLevel_)
623  return;
624 
625  std::shared_ptr<lhef::LHERunInfo> runInfoThis = partonLevel_->getRunInfo();
626  if (runInfoThis != runInfoLast_) {
627  runInfo_ = runInfoThis;
628  runInfoLast_ = runInfoThis;
629  }
630 }
631 
632 //define this as a plug-in
#define LogDebug(id)
void beginRunProduce(edm::Run &run, edm::EventSetup const &es) override
VParameterSet const & getParameterSetVector(std::string const &name) const
void endRunProduce(edm::Run &, edm::EventSetup const &) override
OrphanHandle< PROD > put(std::unique_ptr< PROD > product)
Put a new product.
Definition: Event.h:125
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void addHeader(const Header &header)
def copy(args, dbName)
void addComment(const std::string &line)
def destroy(e)
Definition: pyrootRender.py:15
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
ParameterDescriptionBase * addVPSetOptional(U const &iLabel, ParameterSetDescription const &validator, std::vector< ParameterSet > const &defaults)
void produce(edm::Event &, const edm::EventSetup &) override
void addWeight(const WGT &wgt)
bool exists(std::string const &parameterName) const
checks if a parameter exists
#define NULL
Definition: scimark2.h:8
boost::ptr_deque< LHERunInfoProduct > runInfoProducts_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void setComment(std::string const &value)
int iEvent
Definition: GenABIO.cc:224
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
void addDefault(ParameterSetDescription const &psetDescription)
int np
Definition: AMPTWrapper.h:33
std::shared_ptr< lhef::LHERunInfo > runInfoLast_
std::shared_ptr< lhef::LHERunInfo > runInfo_
int closeDescriptors(int preserve) const
std::unique_ptr< lhef::LHEReader > reader_
bool isAvailable() const
Definition: Service.h:40
std::map< unsigned, std::pair< unsigned, unsigned > > nPartonMapping_
def pipe(cmdline, input=None)
Definition: pipe.py:5
std::vector< std::string > makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const
ParameterDescriptionBase * add(U const &iLabel, T const &value)
virtual std::uint32_t mySeed() const =0
const std::vector< std::string > args_
void addComment(const std::string &line)
void executeScript(std::vector< std::string > const &args, int id) const
void preallocThreads(unsigned int) override
void put(std::unique_ptr< PROD > product)
Put a new product.
Definition: Run.h:108
HLT enums.
dbl *** dir
Definition: mlp_gen.cc:35
def write(self, setup)
UInt_t nEvents
Definition: hcalCalib.cc:41
ExternalLHEProducer(const edm::ParameterSet &iConfig)
def move(src, dest)
Definition: eostools.py:511
Definition: Run.h:45
std::shared_ptr< lhef::LHEEvent > partonLevel_