All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: ExternalLHEProducer
4 // Class: ExternalLHEProducer
5 //
13 //
14 // Original Author: Brian Paul Bockelman,8 R-018,+41227670861,
15 // Created: Fri Oct 21 11:37:26 CEST 2011
16 //
17 //
19 // system include files
20 #include "oneapi/tbb/task_arena.h"
21 #include "oneapi/tbb/task_group.h"
22 #include <cstdio>
23 #include <cstdlib>
24 #include <dirent.h>
25 #include <fcntl.h>
26 #include <filesystem>
27 #include <fstream>
28 #include <memory>
29 #include <string>
30 #include <sys/resource.h>
31 #include <sys/time.h>
32 #include <sys/wait.h>
33 #include <system_error>
34 #include <unistd.h>
35 #include <vector>
37 #include "boost/ptr_container/ptr_deque.hpp"
39 // user include files
65 //
66 // class declaration
67 //
69 class ExternalLHEProducer : public edm::one::EDProducer<edm::BeginRunProducer, edm::one::WatchRuns> {
70 public:
71  explicit ExternalLHEProducer(const edm::ParameterSet& iConfig);
73  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
75 private:
76  void produce(edm::Event&, const edm::EventSetup&) override;
77  void beginRunProduce(edm::Run& run, edm::EventSetup const& es) override;
78  void beginRun(edm::Run const&, edm::EventSetup const&) override;
79  void endRun(edm::Run const&, edm::EventSetup const&) override;
80  void preallocThreads(unsigned int) override;
82  std::vector<std::string> makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const;
83  int closeDescriptors(int preserve) const;
84  void executeScript(std::vector<std::string> const& args, int id, bool isPost) const;
86  void nextEvent();
87  std::unique_ptr<LHERunInfoProduct> generateRunInfo(std::vector<std::string> const& files) const;
89  // ----------member data ---------------------------
92  const std::vector<std::string> args_;
93  uint32_t npars_;
94  uint32_t nEvents_;
95  bool storeXML_;
96  unsigned int nThreads_{1};
98  bool generateConcurrently_{false};
99  const std::vector<std::string> postGenerationCommand_;
101  // Used only if nPartonMapping is in the configuration
102  std::map<unsigned, std::pair<unsigned, unsigned>> nPartonMapping_{};
104  std::unique_ptr<lhef::LHEReader> reader_;
105  std::shared_ptr<lhef::LHEEvent> partonLevel_;
112  public:
113  explicit FileCloseSentry(int fd) : fd_(fd){};
115  ~FileCloseSentry() { close(fd_); }
117  //Make this noncopyable
118  FileCloseSentry(const FileCloseSentry&) = delete;
119  FileCloseSentry& operator=(const FileCloseSentry&) = delete;
121  private:
122  int fd_;
123  };
124 };
126 //
127 // constructors and destructor
128 //
130  : scriptName_((iConfig.getParameter<edm::FileInPath>("scriptName")).fullPath()),
131  outputFile_(iConfig.getParameter<std::string>("outputFile")),
132  args_(iConfig.getParameter<std::vector<std::string>>("args")),
133  npars_(iConfig.getParameter<uint32_t>("numberOfParameters")),
134  nEvents_(iConfig.getUntrackedParameter<uint32_t>("nEvents")),
135  storeXML_(iConfig.getUntrackedParameter<bool>("storeXML")),
136  generateConcurrently_(iConfig.getUntrackedParameter<bool>("generateConcurrently")),
137  postGenerationCommand_(iConfig.getUntrackedParameter<std::vector<std::string>>("postGenerationCommand")) {
138  if (npars_ != args_.size())
139  throw cms::Exception("ExternalLHEProducer")
140  << "Problem with configuration: " << args_.size() << " script arguments given, expected " << npars_;
142  if (iConfig.exists("nPartonMapping")) {
143  auto& processMap(iConfig.getParameterSetVector("nPartonMapping"));
144  for (auto& cfg : processMap) {
145  unsigned processId(cfg.getParameter<unsigned>("idprup"));
147  auto orderStr(cfg.getParameter<std::string>("order"));
148  unsigned order(0);
149  if (orderStr == "LO")
150  order = 0;
151  else if (orderStr == "NLO")
152  order = 1;
153  else
154  throw cms::Exception("ExternalLHEProducer")
155  << "Invalid order specification for process " << processId << ": " << orderStr;
157  unsigned np(cfg.getParameter<unsigned>("np"));
159  nPartonMapping_.emplace(processId, std::make_pair(order, np));
160  }
161  }
163  xmlPutToken_ = produces<LHEXMLStringProduct, edm::Transition::BeginRun>("LHEScriptOutput");
165  eventPutToken_ = produces<LHEEventProduct>();
166  beginRunPutToken_ = produces<LHERunInfoProduct, edm::Transition::BeginRun>();
167 }
169 //
170 // member functions
171 //
173 // ------------ method called with number of threads in job --
174 void ExternalLHEProducer::preallocThreads(unsigned int iThreads) { nThreads_ = iThreads; }
176 // ------------ method called to produce the data ------------
178  nextEvent();
179  if (!partonLevel_) {
181  << "No lhe event found in ExternalLHEProducer::produce(). "
182  << "The likely cause is that the lhe file contains fewer events than were requested, which is possible "
183  << "in case of phase space integration or uneweighting efficiency problems.";
184  }
186  std::unique_ptr<LHEEventProduct> product(
187  new LHEEventProduct(*partonLevel_->getHEPEUP(), partonLevel_->originalXWGTUP()));
188  if (partonLevel_->getPDF()) {
189  product->setPDF(*partonLevel_->getPDF());
190  }
191  std::for_each(partonLevel_->weights().begin(),
192  partonLevel_->weights().end(),
193  std::bind(&LHEEventProduct::addWeight, product.get(), std::placeholders::_1));
194  product->setScales(partonLevel_->scales());
195  product->setEvtNum(partonLevel_->evtnum());
196  if (nPartonMapping_.empty()) {
197  product->setNpLO(partonLevel_->npLO());
198  product->setNpNLO(partonLevel_->npNLO());
199  } else {
200  // overwrite npLO and npNLO values by user-specified mapping
201  unsigned processId(partonLevel_->getHEPEUP()->IDPRUP);
202  unsigned order(0);
203  unsigned np(0);
204  try {
205  auto procDef(;
206  order = procDef.first;
207  np = procDef.second;
208  } catch (std::out_of_range&) {
209  throw cms::Exception("ExternalLHEProducer")
210  << "Unexpected IDPRUP encountered: " << partonLevel_->getHEPEUP()->IDPRUP;
211  }
213  switch (order) {
214  case 0:
215  product->setNpLO(np);
216  product->setNpNLO(-1);
217  break;
218  case 1:
219  product->setNpLO(-1);
220  product->setNpNLO(np);
221  break;
222  default:
223  break;
224  }
225  }
227  std::for_each(partonLevel_->getComments().begin(),
228  partonLevel_->getComments().end(),
229  std::bind(&LHEEventProduct::addComment, product.get(), std::placeholders::_1));
231  iEvent.put(eventPutToken_, std::move(product));
233  partonLevel_.reset();
234  return;
235 }
237 // ------------ method called when starting to processes a run ------------
239  // pass the number of events as previous to last argument
241  // pass the random number generator seed as last argument
245  if (!rng.isAvailable()) {
246  throw cms::Exception("Configuration")
247  << "The ExternalLHEProducer module requires the RandomNumberGeneratorService\n"
248  "which is not present in the configuration file. You must add the service\n"
249  "in the configuration file if you want to run ExternalLHEProducer";
250  }
252  std::vector<std::string> infiles;
253  auto const seed = rng->mySeed();
254  if (generateConcurrently_) {
255  infiles.resize(nThreads_);
256  auto const nEventsAve = nEvents_ / nThreads_;
257  unsigned int const overflow = nThreads_ - (nEvents_ % nThreads_);
258  std::exception_ptr except;
259  std::atomic<char> exceptSet{0};
261  tbb::this_task_arena::isolate([this, &except, &infiles, &exceptSet, nEventsAve, overflow, seed]() {
262  tbb::task_group group;
263  for (unsigned int t = 0; t < nThreads_; ++t) {
264  uint32_t nEvents = nEventsAve;
265  if (nEvents_ % nThreads_ != 0 and t >= overflow) {
266  nEvents += 1;
267  }
268[t, this, &infiles, seed, nEvents, &except, &exceptSet]() {
269  CMS_SA_ALLOW try {
270  using namespace std::filesystem;
271  using namespace std::string_literals;
272  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
273  infiles[t] = out.native();
274  executeScript(makeArgs(nEvents, 1, seed + t), t, false);
275  } catch (...) {
276  char expected = 0;
277  if (exceptSet.compare_exchange_strong(expected, 1)) {
278  except = std::current_exception();
280  }
281  }
282  });
283  }
284  group.wait();
285  });
286  if (exceptSet != 0) {
287  std::rethrow_exception(except);
288  }
289  } else {
290  infiles = std::vector<std::string>(1, outputFile_);
292  }
294  //run post-generation command if specified
295  if (!postGenerationCommand_.empty()) {
296  std::vector<std::string> postcmd = postGenerationCommand_;
297  try {
298  postcmd[0] = edm::FileInPath(postcmd[0]).fullPath();
299  } catch (const edm::Exception& e) {
300  edm::LogWarning("ExternalLHEProducer") << postcmd[0] << " is not a relative path. Run it as a shell command.";
301  }
302  executeScript(postcmd, 0, true);
303  }
305  //fill LHEXMLProduct (streaming read directly into compressed buffer to save memory)
306  std::unique_ptr<LHEXMLStringProduct> p(new LHEXMLStringProduct);
308  //store the XML file only if explictly requested
309  if (storeXML_) {
311  if (generateConcurrently_) {
312  using namespace std::filesystem;
313  file = (path("thread0") / path(outputFile_)).native();
314  } else {
315  file = outputFile_;
316  }
317  std::ifstream instream(file);
318  if (!instream) {
319  throw cms::Exception("OutputOpenError") << "Unable to open script output file " << outputFile_ << ".";
320  }
321  instream.seekg(0, instream.end);
322  int insize = instream.tellg();
323  instream.seekg(0, instream.beg);
324  p->fillCompressedContent(instream, 0.25 * insize);
325  instream.close();
326  }
327  run.put(xmlPutToken_, std::move(p));
329  //Read the beginning of each file to get the run info in order to do the merge
330  auto runInfo = generateRunInfo(infiles);
331  if (runInfo) {
333  }
335  // LHE C++ classes translation
336  // (read back uncompressed file from disk in streaming mode again to save memory)
337  unsigned int skip = 0;
338  reader_ = std::make_unique<lhef::LHEReader>(infiles, skip);
340  nextEvent();
341 }
344 // ------------ method called when ending the processing of a run ------------
346  nextEvent();
347  if (partonLevel_) {
348  // VALIDATION_RUN env variable allows to finish event processing early without errors by sending SIGINT
349  if (std::getenv("VALIDATION_RUN") != nullptr) {
350  edm::LogWarning("ExternalLHEProducer")
351  << "Event loop is over, but there are still lhe events to process, ignoring...";
352  } else {
354  << "Error in ExternalLHEProducer::endRunProduce(). "
355  << "Event loop is over, but there are still lhe events to process."
356  << "This could happen if lhe file contains more events than requested. This is never expected to happen.";
357  }
358  }
360  reader_.reset();
361  if (generateConcurrently_) {
362  for (unsigned int t = 0; t < nThreads_; ++t) {
363  using namespace std::filesystem;
364  using namespace std::string_literals;
365  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
366  if (unlink(out.c_str())) {
367  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << out
368  << " (errno=" << errno << ", " << strerror(errno) << ").";
369  }
370  }
371  } else {
372  if (unlink(outputFile_.c_str())) {
373  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << outputFile_
374  << " (errno=" << errno << ", " << strerror(errno) << ").";
375  }
376  }
377 }
379 std::vector<std::string> ExternalLHEProducer::makeArgs(uint32_t nEvents,
380  unsigned int nThreads,
381  std::uint32_t seed) const {
382  std::vector<std::string> args;
383  args.reserve(3 + args_.size());
385  args.push_back(args_.front());
386  args.push_back(std::to_string(nEvents));
388  args.push_back(std::to_string(seed));
390  args.push_back(std::to_string(nThreads));
391  std::copy(args_.begin() + 1, args_.end(), std::back_inserter(args));
393  for (unsigned int iArg = 0; iArg < args.size(); iArg++) {
394  LogDebug("LHEInputArgs") << "arg [" << iArg << "] = " << args[iArg];
395  }
397  return args;
398 }
400 // ------------ Close all the open file descriptors ------------
401 int ExternalLHEProducer::closeDescriptors(int preserve) const {
402  int maxfd = 1024;
403  int fd;
404 #ifdef __linux__
405  DIR* dir;
406  struct dirent* dp;
407  maxfd = preserve;
408  if ((dir = opendir("/proc/self/fd"))) {
409  errno = 0;
410  while ((dp = readdir(dir)) != nullptr) {
411  if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) {
412  continue;
413  }
414  if (sscanf(dp->d_name, "%d", &fd) != 1) {
415  //throw cms::Exception("closeDescriptors") << "Found unexpected filename in /proc/self/fd: " << dp->d_name;
416  return -1;
417  }
418  if (fd > maxfd) {
419  maxfd = fd;
420  }
421  }
422  if (errno) {
423  //throw cms::Exception("closeDescriptors") << "Unable to determine the number of fd (errno=" << errno << ", " << strerror(errno) << ").";
424  return errno;
425  }
426  closedir(dir);
427  }
428 #endif
429  // TODO: assert for an unreasonable number of fds?
430  for (fd = 3; fd < maxfd + 1; fd++) {
431  if (fd != preserve)
432  close(fd);
433  }
434  return 0;
435 }
437 // ------------ Execute the script associated with this producer ------------
438 void ExternalLHEProducer::executeScript(std::vector<std::string> const& args, int id, bool isPost) const {
439  // Fork a script, wait until it finishes.
441  int rc = 0, rc2 = 0;
442  int filedes[2], fd_flags;
444  if (pipe(filedes)) {
445  throw cms::Exception("Unable to create a new pipe");
446  }
447  FileCloseSentry sentry1(filedes[0]), sentry2(filedes[1]);
449  if ((fd_flags = fcntl(filedes[1], F_GETFD, NULL)) == -1) {
450  throw cms::Exception("ExternalLHEProducer")
451  << "Failed to get pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
452  }
453  if (fcntl(filedes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
454  throw cms::Exception("ExternalLHEProducer")
455  << "Failed to set pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
456  }
458  unsigned int argc_pre = 0;
459  // For generation command the first argument gives to the scriptName
460  if (!isPost) {
461  argc_pre = 1;
462  }
463  unsigned int argc = argc_pre + args.size();
464  // TODO: assert that we have a reasonable number of arguments
465  char** argv = new char*[argc + 1];
466  if (!isPost) {
467  argv[0] = strdup(scriptName_.c_str());
468  }
469  for (unsigned int i = 0; i < args.size(); i++) {
470  argv[argc_pre + i] = strdup(args[i].c_str());
471  }
472  argv[argc] = nullptr;
474  pid_t pid = fork();
475  if (pid == 0) {
476  // The child process
477  if (!(rc = closeDescriptors(filedes[1]))) {
478  if (!isPost && generateConcurrently_) {
479  using namespace std::filesystem;
480  using namespace std::string_literals;
481  std::error_code ec;
482  auto newDir = path("thread"s + std::to_string(id));
483  create_directory(newDir, ec);
484  current_path(newDir, ec);
485  }
486  execvp(argv[0], argv); // If execv returns, we have an error.
487  rc = errno;
488  }
489  while ((write(filedes[1], &rc, sizeof(int)) == -1) && (errno == EINTR)) {
490  }
491  _exit(1);
492  }
494  // Free the arg vector ASAP
495  for (unsigned int i = 0; i < args.size() + 1; i++) {
496  free(argv[i]);
497  }
498  delete[] argv;
500  if (pid == -1) {
501  throw cms::Exception("ForkException")
502  << "Unable to fork a child (errno=" << errno << ", " << strerror(errno) << ")";
503  }
505  close(filedes[1]);
506  // If the exec succeeds, the read will fail.
507  while (((rc2 = read(filedes[0], &rc, sizeof(int))) == -1) && (errno == EINTR)) {
508  rc2 = 0;
509  }
510  if ((rc2 == sizeof(int)) && rc) {
511  throw cms::Exception("ExternalLHEProducer")
512  << "Failed to execute script (errno=" << rc << ", " << strerror(rc) << ")";
513  }
514  close(filedes[0]);
516  int status = 0;
517  errno = 0;
518  do {
519  if (waitpid(pid, &status, 0) < 0) {
520  if (errno == EINTR) {
521  continue;
522  } else {
523  throw cms::Exception("ExternalLHEProducer")
524  << "Failed to read child status (errno=" << errno << ", " << strerror(errno) << ")";
525  }
526  }
527  if (WIFSIGNALED(status)) {
528  throw cms::Exception("ExternalLHEProducer") << "Child exited due to signal " << WTERMSIG(status) << ".";
529  }
530  if (WIFEXITED(status)) {
531  rc = WEXITSTATUS(status);
532  break;
533  }
534  } while (true);
535  if (rc) {
536  throw cms::Exception("ExternalLHEProducer") << "Child failed with exit code " << rc << ".";
537  }
538 }
540 // ------------ method fills 'descriptions' with the allowed parameters for the module ------------
542  //The following says we do not know what parameters are allowed so do no validation
543  // Please change this to state exactly what you do use, even if it is no parameters
545  desc.setComment("Executes an external script and places its output file into an EDM collection");
547  edm::FileInPath thePath;
548  desc.add<edm::FileInPath>("scriptName", thePath);
549  desc.add<std::string>("outputFile", "myoutput");
550  desc.add<std::vector<std::string>>("args");
551  desc.add<uint32_t>("numberOfParameters");
552  desc.addUntracked<uint32_t>("nEvents");
553  desc.addUntracked<bool>("storeXML", false);
554  desc.addUntracked<bool>("generateConcurrently", false)
555  ->setComment("If true, run the script concurrently in separate processes.");
556  desc.addUntracked<std::vector<std::string>>("postGenerationCommand", std::vector<std::string>())
557  ->setComment(
558  "Command to run after the generation script has completed. The first argument can be a relative path.");
560  edm::ParameterSetDescription nPartonMappingDesc;
561  nPartonMappingDesc.add<unsigned>("idprup");
562  nPartonMappingDesc.add<std::string>("order");
563  nPartonMappingDesc.add<unsigned>("np");
564  desc.addVPSetOptional("nPartonMapping", nPartonMappingDesc);
566  descriptions.addDefault(desc);
567 }
569 std::unique_ptr<LHERunInfoProduct> ExternalLHEProducer::generateRunInfo(std::vector<std::string> const& iFiles) const {
570  std::unique_ptr<LHERunInfoProduct> retValue;
571  //read each file in turn and only get the header info
572  for (auto const& file : iFiles) {
573  unsigned int skip = 0;
574  std::vector<std::string> infiles(1, file);
575  auto reader = std::make_unique<lhef::LHEReader>(infiles, skip);
576  auto parton = reader->next();
577  if (!parton) {
578  break;
579  }
580  auto runInfo = parton->getRunInfo();
581  LHERunInfoProduct product(*runInfo->getHEPRUP());
583  std::for_each(runInfo->getHeaders().begin(),
584  runInfo->getHeaders().end(),
585  std::bind(&LHERunInfoProduct::addHeader, &product, std::placeholders::_1));
586  std::for_each(runInfo->getComments().begin(),
587  runInfo->getComments().end(),
588  std::bind(&LHERunInfoProduct::addComment, &product, std::placeholders::_1));
589  if (not retValue) {
590  retValue = std::make_unique<LHERunInfoProduct>(std::move(product));
591  } else {
592  retValue->mergeProduct(product);
593  }
594  }
596  return retValue;
597 }
600  if (partonLevel_)
601  return;
603  if (not reader_) {
604  return;
605  }
607  partonLevel_ = reader_->next();
608  if (!partonLevel_) {
609  //see if we have another file to read;
610  bool newFileOpened;
611  do {
612  newFileOpened = false;
613  partonLevel_ = reader_->next(&newFileOpened);
614  } while (newFileOpened && !partonLevel_);
615  }
616 }
618 //define this as a plug-in
void beginRunProduce(edm::Run &run, edm::EventSetup const &es) override
#define CMS_SA_ALLOW
void addHeader(const Header &header)
std::string fullPath() const
void addComment(const std::string &line)
void produce(edm::Event &, const edm::EventSetup &) override
bool exists(std::string const &parameterName) const
checks if a parameter exists
void addWeight(const WGT &wgt)
#define NULL
Definition: scimark2.h:8
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
edm::EDPutTokenT< LHERunInfoProduct > beginRunPutToken_
edm::EDPutTokenT< LHEEventProduct > eventPutToken_
static std::string to_string(const XMLCh *ch)
void free(void *ptr) noexcept
std::unique_ptr< LHERunInfoProduct > generateRunInfo(std::vector< std::string > const &files) const
int iEvent
void addDefault(ParameterSetDescription const &psetDescription)
virtual std::uint32_t mySeed() const =0
int np
Definition: AMPTWrapper.h:43
edm::EDPutTokenT< LHEXMLStringProduct > xmlPutToken_
std::unique_ptr< lhef::LHEReader > reader_
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::map< unsigned, std::pair< unsigned, unsigned > > nPartonMapping_
def pipe(cmdline, input=None)
ParameterDescriptionBase * add(U const &iLabel, T const &value)
int closeDescriptors(int preserve) const
void executeScript(std::vector< std::string > const &args, int id, bool isPost) const
const std::vector< std::string > args_
std::vector< std::string > makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const
const std::vector< std::string > postGenerationCommand_
void addComment(const std::string &line)
void beginRun(edm::Run const &, edm::EventSetup const &) override
void preallocThreads(unsigned int) override
bool mergeProduct(const LHERunInfoProduct &other)
HLT enums.
VParameterSet const & getParameterSetVector(std::string const &name) const
FileCloseSentry & operator=(const FileCloseSentry &)=delete
bool isAvailable() const
Definition: Service.h:40
Log< level::Warning, false > LogWarning
ExternalLHEProducer(const edm::ParameterSet &iConfig)
def move(src, dest)
Definition: Run.h:45
std::shared_ptr< lhef::LHEEvent > partonLevel_
#define LogDebug(id)
void endRun(edm::Run const &, edm::EventSetup const &) override