CMS 3D CMS Logo

ExternalLHEProducer.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: ExternalLHEProducer
4 // Class: ExternalLHEProducer
5 //
13 //
14 // Original Author: Brian Paul Bockelman,8 R-018,+41227670861,
15 // Created: Fri Oct 21 11:37:26 CEST 2011
16 //
17 //
18 
19 // system include files
20 #include "oneapi/tbb/task_arena.h"
21 #include "oneapi/tbb/task_group.h"
22 #include <cstdio>
23 #include <cstdlib>
24 #include <dirent.h>
25 #include <fcntl.h>
26 #include <filesystem>
27 #include <fstream>
28 #include <memory>
29 #include <string>
30 #include <sys/resource.h>
31 #include <sys/time.h>
32 #include <sys/wait.h>
33 #include <system_error>
34 #include <unistd.h>
35 #include <vector>
36 
37 #include "boost/ptr_container/ptr_deque.hpp"
38 
39 // user include files
43 
47 
50 
55 
59 
62 
64 
65 //
66 // class declaration
67 //
68 
69 class ExternalLHEProducer : public edm::one::EDProducer<edm::BeginRunProducer, edm::one::WatchRuns> {
70 public:
71  explicit ExternalLHEProducer(const edm::ParameterSet& iConfig);
72 
73  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
74 
75 private:
76  void produce(edm::Event&, const edm::EventSetup&) override;
77  void beginRunProduce(edm::Run& run, edm::EventSetup const& es) override;
78  void beginRun(edm::Run const&, edm::EventSetup const&) override;
79  void endRun(edm::Run const&, edm::EventSetup const&) override;
80  void preallocThreads(unsigned int) override;
81 
82  std::vector<std::string> makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const;
83  int closeDescriptors(int preserve) const;
84  void executeScript(std::vector<std::string> const& args, int id, bool isPost) const;
85 
86  void nextEvent();
87  std::unique_ptr<LHERunInfoProduct> generateRunInfo(std::vector<std::string> const& files) const;
88 
89  // ----------member data ---------------------------
92  const std::vector<std::string> args_;
93  uint32_t npars_;
94  uint32_t nEvents_;
95  bool storeXML_;
96  unsigned int nThreads_{1};
98  bool generateConcurrently_{false};
99  const std::vector<std::string> postGenerationCommand_;
100 
101  // Used only if nPartonMapping is in the configuration
102  std::map<unsigned, std::pair<unsigned, unsigned>> nPartonMapping_{};
103 
104  std::unique_ptr<lhef::LHEReader> reader_;
105  std::shared_ptr<lhef::LHEEvent> partonLevel_;
107 
112  public:
113  explicit FileCloseSentry(int fd) : fd_(fd){};
114 
115  ~FileCloseSentry() { close(fd_); }
116 
117  //Make this noncopyable
118  FileCloseSentry(const FileCloseSentry&) = delete;
119  FileCloseSentry& operator=(const FileCloseSentry&) = delete;
120 
121  private:
122  int fd_;
123  };
124 };
125 
126 //
127 // constructors and destructor
128 //
130  : scriptName_((iConfig.getParameter<edm::FileInPath>("scriptName")).fullPath()),
131  outputFile_(iConfig.getParameter<std::string>("outputFile")),
132  args_(iConfig.getParameter<std::vector<std::string>>("args")),
133  npars_(iConfig.getParameter<uint32_t>("numberOfParameters")),
134  nEvents_(iConfig.getUntrackedParameter<uint32_t>("nEvents")),
135  storeXML_(iConfig.getUntrackedParameter<bool>("storeXML")),
136  generateConcurrently_(iConfig.getUntrackedParameter<bool>("generateConcurrently")),
137  postGenerationCommand_(iConfig.getUntrackedParameter<std::vector<std::string>>("postGenerationCommand")) {
138  if (npars_ != args_.size())
139  throw cms::Exception("ExternalLHEProducer")
140  << "Problem with configuration: " << args_.size() << " script arguments given, expected " << npars_;
141 
142  if (iConfig.exists("nPartonMapping")) {
143  auto& processMap(iConfig.getParameterSetVector("nPartonMapping"));
144  for (auto& cfg : processMap) {
145  unsigned processId(cfg.getParameter<unsigned>("idprup"));
146 
147  auto orderStr(cfg.getParameter<std::string>("order"));
148  unsigned order(0);
149  if (orderStr == "LO")
150  order = 0;
151  else if (orderStr == "NLO")
152  order = 1;
153  else
154  throw cms::Exception("ExternalLHEProducer")
155  << "Invalid order specification for process " << processId << ": " << orderStr;
156 
157  unsigned np(cfg.getParameter<unsigned>("np"));
158 
159  nPartonMapping_.emplace(processId, std::make_pair(order, np));
160  }
161  }
162 
163  xmlPutToken_ = produces<LHEXMLStringProduct, edm::Transition::BeginRun>("LHEScriptOutput");
164 
165  eventPutToken_ = produces<LHEEventProduct>();
166  beginRunPutToken_ = produces<LHERunInfoProduct, edm::Transition::BeginRun>();
167 }
168 
169 //
170 // member functions
171 //
172 
173 // ------------ method called with number of threads in job --
174 void ExternalLHEProducer::preallocThreads(unsigned int iThreads) { nThreads_ = iThreads; }
175 
176 // ------------ method called to produce the data ------------
178  nextEvent();
179  if (!partonLevel_) {
181  << "No lhe event found in ExternalLHEProducer::produce(). "
182  << "The likely cause is that the lhe file contains fewer events than were requested, which is possible "
183  << "in case of phase space integration or uneweighting efficiency problems.";
184  }
185 
186  std::unique_ptr<LHEEventProduct> product(
187  new LHEEventProduct(*partonLevel_->getHEPEUP(), partonLevel_->originalXWGTUP()));
188  if (partonLevel_->getPDF()) {
189  product->setPDF(*partonLevel_->getPDF());
190  }
191  std::for_each(partonLevel_->weights().begin(),
192  partonLevel_->weights().end(),
193  std::bind(&LHEEventProduct::addWeight, product.get(), std::placeholders::_1));
194  product->setScales(partonLevel_->scales());
195  product->setEvtNum(partonLevel_->evtnum());
196  if (nPartonMapping_.empty()) {
197  product->setNpLO(partonLevel_->npLO());
198  product->setNpNLO(partonLevel_->npNLO());
199  } else {
200  // overwrite npLO and npNLO values by user-specified mapping
201  unsigned processId(partonLevel_->getHEPEUP()->IDPRUP);
202  unsigned order(0);
203  unsigned np(0);
204  try {
205  auto procDef(nPartonMapping_.at(processId));
206  order = procDef.first;
207  np = procDef.second;
208  } catch (std::out_of_range&) {
209  throw cms::Exception("ExternalLHEProducer")
210  << "Unexpected IDPRUP encountered: " << partonLevel_->getHEPEUP()->IDPRUP;
211  }
212 
213  switch (order) {
214  case 0:
215  product->setNpLO(np);
216  product->setNpNLO(-1);
217  break;
218  case 1:
219  product->setNpLO(-1);
220  product->setNpNLO(np);
221  break;
222  default:
223  break;
224  }
225  }
226 
227  std::for_each(partonLevel_->getComments().begin(),
228  partonLevel_->getComments().end(),
229  std::bind(&LHEEventProduct::addComment, product.get(), std::placeholders::_1));
230 
231  iEvent.put(eventPutToken_, std::move(product));
232 
233  partonLevel_.reset();
234  return;
235 }
236 
237 // ------------ method called when starting to processes a run ------------
239  // pass the number of events as previous to last argument
240 
241  // pass the random number generator seed as last argument
242 
244 
245  if (!rng.isAvailable()) {
246  throw cms::Exception("Configuration")
247  << "The ExternalLHEProducer module requires the RandomNumberGeneratorService\n"
248  "which is not present in the configuration file. You must add the service\n"
249  "in the configuration file if you want to run ExternalLHEProducer";
250  }
251 
252  std::vector<std::string> infiles;
253  auto const seed = rng->mySeed();
254  if (generateConcurrently_) {
255  infiles.resize(nThreads_);
256  auto const nEventsAve = nEvents_ / nThreads_;
257  unsigned int const overflow = nThreads_ - (nEvents_ % nThreads_);
258  std::exception_ptr except;
259  std::atomic<char> exceptSet{0};
260 
261  tbb::this_task_arena::isolate([this, &except, &infiles, &exceptSet, nEventsAve, overflow, seed]() {
262  tbb::task_group group;
263  for (unsigned int t = 0; t < nThreads_; ++t) {
264  uint32_t nEvents = nEventsAve;
265  if (nEvents_ % nThreads_ != 0 and t >= overflow) {
266  nEvents += 1;
267  }
268  group.run([t, this, &infiles, seed, nEvents, &except, &exceptSet]() {
269  CMS_SA_ALLOW try {
270  using namespace std::filesystem;
271  using namespace std::string_literals;
272  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
273  infiles[t] = out.native();
274  executeScript(makeArgs(nEvents, 1, seed + t), t, false);
275  } catch (...) {
276  char expected = 0;
277  if (exceptSet.compare_exchange_strong(expected, 1)) {
278  except = std::current_exception();
279  exceptSet.store(2);
280  }
281  }
282  });
283  }
284  group.wait();
285  });
286  if (exceptSet != 0) {
287  std::rethrow_exception(except);
288  }
289  } else {
290  infiles = std::vector<std::string>(1, outputFile_);
292  }
293 
294  //run post-generation command if specified
295  if (!postGenerationCommand_.empty()) {
296  std::vector<std::string> postcmd = postGenerationCommand_;
297  try {
298  postcmd[0] = edm::FileInPath(postcmd[0]).fullPath();
299  } catch (const edm::Exception& e) {
300  edm::LogWarning("ExternalLHEProducer") << postcmd[0] << " is not a relative path. Run it as a shell command.";
301  }
302  executeScript(postcmd, 0, true);
303  }
304 
305  //fill LHEXMLProduct (streaming read directly into compressed buffer to save memory)
306  std::unique_ptr<LHEXMLStringProduct> p(new LHEXMLStringProduct);
307 
308  //store the XML file only if explictly requested
309  if (storeXML_) {
311  if (generateConcurrently_) {
312  using namespace std::filesystem;
313  file = (path("thread0") / path(outputFile_)).native();
314  } else {
315  file = outputFile_;
316  }
317  std::ifstream instream(file);
318  if (!instream) {
319  throw cms::Exception("OutputOpenError") << "Unable to open script output file " << outputFile_ << ".";
320  }
321  instream.seekg(0, instream.end);
322  int insize = instream.tellg();
323  instream.seekg(0, instream.beg);
324  p->fillCompressedContent(instream, 0.25 * insize);
325  instream.close();
326  }
327  run.put(xmlPutToken_, std::move(p));
328 
329  //Read the beginning of each file to get the run info in order to do the merge
330  auto runInfo = generateRunInfo(infiles);
331  if (runInfo) {
333  }
334 
335  // LHE C++ classes translation
336  // (read back uncompressed file from disk in streaming mode again to save memory)
337  unsigned int skip = 0;
338  reader_ = std::make_unique<lhef::LHEReader>(infiles, skip);
339 
340  nextEvent();
341 }
342 
344 // ------------ method called when ending the processing of a run ------------
346  nextEvent();
347  if (partonLevel_) {
348  // VALIDATION_RUN env variable allows to finish event processing early without errors by sending SIGINT
349  if (std::getenv("VALIDATION_RUN") != nullptr) {
350  edm::LogWarning("ExternalLHEProducer")
351  << "Event loop is over, but there are still lhe events to process, ignoring...";
352  } else {
354  << "Error in ExternalLHEProducer::endRunProduce(). "
355  << "Event loop is over, but there are still lhe events to process."
356  << "This could happen if lhe file contains more events than requested. This is never expected to happen.";
357  }
358  }
359 
360  reader_.reset();
361  if (generateConcurrently_) {
362  for (unsigned int t = 0; t < nThreads_; ++t) {
363  using namespace std::filesystem;
364  using namespace std::string_literals;
365  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
366  if (unlink(out.c_str())) {
367  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << out
368  << " (errno=" << errno << ", " << strerror(errno) << ").";
369  }
370  }
371  } else {
372  if (unlink(outputFile_.c_str())) {
373  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << outputFile_
374  << " (errno=" << errno << ", " << strerror(errno) << ").";
375  }
376  }
377 }
378 
379 std::vector<std::string> ExternalLHEProducer::makeArgs(uint32_t nEvents,
380  unsigned int nThreads,
381  std::uint32_t seed) const {
382  std::vector<std::string> args;
383  args.reserve(3 + args_.size());
384 
385  args.push_back(args_.front());
386  args.push_back(std::to_string(nEvents));
387 
388  args.push_back(std::to_string(seed));
389 
390  args.push_back(std::to_string(nThreads));
391  std::copy(args_.begin() + 1, args_.end(), std::back_inserter(args));
392 
393  for (unsigned int iArg = 0; iArg < args.size(); iArg++) {
394  LogDebug("LHEInputArgs") << "arg [" << iArg << "] = " << args[iArg];
395  }
396 
397  return args;
398 }
399 
400 // ------------ Close all the open file descriptors ------------
401 int ExternalLHEProducer::closeDescriptors(int preserve) const {
402  int maxfd = 1024;
403  int fd;
404 #ifdef __linux__
405  DIR* dir;
406  struct dirent* dp;
407  maxfd = preserve;
408  if ((dir = opendir("/proc/self/fd"))) {
409  errno = 0;
410  while ((dp = readdir(dir)) != nullptr) {
411  if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) {
412  continue;
413  }
414  if (sscanf(dp->d_name, "%d", &fd) != 1) {
415  //throw cms::Exception("closeDescriptors") << "Found unexpected filename in /proc/self/fd: " << dp->d_name;
416  return -1;
417  }
418  if (fd > maxfd) {
419  maxfd = fd;
420  }
421  }
422  if (errno) {
423  //throw cms::Exception("closeDescriptors") << "Unable to determine the number of fd (errno=" << errno << ", " << strerror(errno) << ").";
424  return errno;
425  }
426  closedir(dir);
427  }
428 #endif
429  // TODO: assert for an unreasonable number of fds?
430  for (fd = 3; fd < maxfd + 1; fd++) {
431  if (fd != preserve)
432  close(fd);
433  }
434  return 0;
435 }
436 
437 // ------------ Execute the script associated with this producer ------------
438 void ExternalLHEProducer::executeScript(std::vector<std::string> const& args, int id, bool isPost) const {
439  // Fork a script, wait until it finishes.
440 
441  int rc = 0, rc2 = 0;
442  int filedes[2], fd_flags;
443 
444  if (pipe(filedes)) {
445  throw cms::Exception("Unable to create a new pipe");
446  }
447  FileCloseSentry sentry1(filedes[0]), sentry2(filedes[1]);
448 
449  if ((fd_flags = fcntl(filedes[1], F_GETFD, NULL)) == -1) {
450  throw cms::Exception("ExternalLHEProducer")
451  << "Failed to get pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
452  }
453  if (fcntl(filedes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
454  throw cms::Exception("ExternalLHEProducer")
455  << "Failed to set pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
456  }
457 
458  unsigned int argc_pre = 0;
459  // For generation command the first argument gives to the scriptName
460  if (!isPost) {
461  argc_pre = 1;
462  }
463  unsigned int argc = argc_pre + args.size();
464  // TODO: assert that we have a reasonable number of arguments
465  char** argv = new char*[argc + 1];
466  if (!isPost) {
467  argv[0] = strdup(scriptName_.c_str());
468  }
469  for (unsigned int i = 0; i < args.size(); i++) {
470  argv[argc_pre + i] = strdup(args[i].c_str());
471  }
472  argv[argc] = nullptr;
473 
474  pid_t pid = fork();
475  if (pid == 0) {
476  // The child process
477  if (!(rc = closeDescriptors(filedes[1]))) {
478  if (!isPost && generateConcurrently_) {
479  using namespace std::filesystem;
480  using namespace std::string_literals;
481  std::error_code ec;
482  auto newDir = path("thread"s + std::to_string(id));
483  create_directory(newDir, ec);
484  current_path(newDir, ec);
485  }
486  execvp(argv[0], argv); // If execv returns, we have an error.
487  rc = errno;
488  }
489  while ((write(filedes[1], &rc, sizeof(int)) == -1) && (errno == EINTR)) {
490  }
491  _exit(1);
492  }
493 
494  // Free the arg vector ASAP
495  for (unsigned int i = 0; i < args.size() + 1; i++) {
496  free(argv[i]);
497  }
498  delete[] argv;
499 
500  if (pid == -1) {
501  throw cms::Exception("ForkException")
502  << "Unable to fork a child (errno=" << errno << ", " << strerror(errno) << ")";
503  }
504 
505  close(filedes[1]);
506  // If the exec succeeds, the read will fail.
507  while (((rc2 = read(filedes[0], &rc, sizeof(int))) == -1) && (errno == EINTR)) {
508  rc2 = 0;
509  }
510  if ((rc2 == sizeof(int)) && rc) {
511  throw cms::Exception("ExternalLHEProducer")
512  << "Failed to execute script (errno=" << rc << ", " << strerror(rc) << ")";
513  }
514  close(filedes[0]);
515 
516  int status = 0;
517  errno = 0;
518  do {
519  if (waitpid(pid, &status, 0) < 0) {
520  if (errno == EINTR) {
521  continue;
522  } else {
523  throw cms::Exception("ExternalLHEProducer")
524  << "Failed to read child status (errno=" << errno << ", " << strerror(errno) << ")";
525  }
526  }
527  if (WIFSIGNALED(status)) {
528  throw cms::Exception("ExternalLHEProducer") << "Child exited due to signal " << WTERMSIG(status) << ".";
529  }
530  if (WIFEXITED(status)) {
531  rc = WEXITSTATUS(status);
532  break;
533  }
534  } while (true);
535  if (rc) {
536  throw cms::Exception("ExternalLHEProducer") << "Child failed with exit code " << rc << ".";
537  }
538 }
539 
540 // ------------ method fills 'descriptions' with the allowed parameters for the module ------------
542  //The following says we do not know what parameters are allowed so do no validation
543  // Please change this to state exactly what you do use, even if it is no parameters
545  desc.setComment("Executes an external script and places its output file into an EDM collection");
546 
547  edm::FileInPath thePath;
548  desc.add<edm::FileInPath>("scriptName", thePath);
549  desc.add<std::string>("outputFile", "myoutput");
550  desc.add<std::vector<std::string>>("args");
551  desc.add<uint32_t>("numberOfParameters");
552  desc.addUntracked<uint32_t>("nEvents");
553  desc.addUntracked<bool>("storeXML", false);
554  desc.addUntracked<bool>("generateConcurrently", false)
555  ->setComment("If true, run the script concurrently in separate processes.");
556  desc.addUntracked<std::vector<std::string>>("postGenerationCommand", std::vector<std::string>())
557  ->setComment(
558  "Command to run after the generation script has completed. The first argument can be a relative path.");
559 
560  edm::ParameterSetDescription nPartonMappingDesc;
561  nPartonMappingDesc.add<unsigned>("idprup");
562  nPartonMappingDesc.add<std::string>("order");
563  nPartonMappingDesc.add<unsigned>("np");
564  desc.addVPSetOptional("nPartonMapping", nPartonMappingDesc);
565 
566  descriptions.addDefault(desc);
567 }
568 
569 std::unique_ptr<LHERunInfoProduct> ExternalLHEProducer::generateRunInfo(std::vector<std::string> const& iFiles) const {
570  std::unique_ptr<LHERunInfoProduct> retValue;
571  //read each file in turn and only get the header info
572  for (auto const& file : iFiles) {
573  unsigned int skip = 0;
574  std::vector<std::string> infiles(1, file);
575  auto reader = std::make_unique<lhef::LHEReader>(infiles, skip);
576  auto parton = reader->next();
577  if (!parton) {
578  break;
579  }
580  auto runInfo = parton->getRunInfo();
581  LHERunInfoProduct product(*runInfo->getHEPRUP());
582 
583  std::for_each(runInfo->getHeaders().begin(),
584  runInfo->getHeaders().end(),
585  std::bind(&LHERunInfoProduct::addHeader, &product, std::placeholders::_1));
586  std::for_each(runInfo->getComments().begin(),
587  runInfo->getComments().end(),
588  std::bind(&LHERunInfoProduct::addComment, &product, std::placeholders::_1));
589  if (not retValue) {
590  retValue = std::make_unique<LHERunInfoProduct>(std::move(product));
591  } else {
592  retValue->mergeProduct(product);
593  }
594  }
595 
596  return retValue;
597 }
598 
600  if (partonLevel_)
601  return;
602 
603  if (not reader_) {
604  return;
605  }
606 
607  partonLevel_ = reader_->next();
608  if (!partonLevel_) {
609  //see if we have another file to read;
610  bool newFileOpened;
611  do {
612  newFileOpened = false;
613  partonLevel_ = reader_->next(&newFileOpened);
614  } while (newFileOpened && !partonLevel_);
615  }
616 }
617 
618 //define this as a plug-in
void beginRunProduce(edm::Run &run, edm::EventSetup const &es) override
#define CMS_SA_ALLOW
void addHeader(const Header &header)
std::string fullPath() const
Definition: FileInPath.cc:161
void addComment(const std::string &line)
void produce(edm::Event &, const edm::EventSetup &) override
bool exists(std::string const &parameterName) const
checks if a parameter exists
void addWeight(const WGT &wgt)
std::string to_string(const V &value)
Definition: OMSAccess.h:77
#define NULL
Definition: scimark2.h:8
reader
Definition: DQM.py:105
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
edm::EDPutTokenT< LHERunInfoProduct > beginRunPutToken_
edm::EDPutTokenT< LHEEventProduct > eventPutToken_
std::unique_ptr< LHERunInfoProduct > generateRunInfo(std::vector< std::string > const &files) const
int iEvent
Definition: GenABIO.cc:224
void addDefault(ParameterSetDescription const &psetDescription)
virtual std::uint32_t mySeed() const =0
int np
Definition: AMPTWrapper.h:43
edm::EDPutTokenT< LHEXMLStringProduct > xmlPutToken_
std::unique_ptr< lhef::LHEReader > reader_
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::map< unsigned, std::pair< unsigned, unsigned > > nPartonMapping_
def pipe(cmdline, input=None)
Definition: pipe.py:5
ParameterDescriptionBase * add(U const &iLabel, T const &value)
int closeDescriptors(int preserve) const
void executeScript(std::vector< std::string > const &args, int id, bool isPost) const
const std::vector< std::string > args_
std::vector< std::string > makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const
const std::vector< std::string > postGenerationCommand_
void addComment(const std::string &line)
void beginRun(edm::Run const &, edm::EventSetup const &) override
void preallocThreads(unsigned int) override
bool mergeProduct(const LHERunInfoProduct &other)
HLT enums.
VParameterSet const & getParameterSetVector(std::string const &name) const
FileCloseSentry & operator=(const FileCloseSentry &)=delete
bool isAvailable() const
Definition: Service.h:40
Log< level::Warning, false > LogWarning
fd
Definition: ztee.py:136
ExternalLHEProducer(const edm::ParameterSet &iConfig)
def move(src, dest)
Definition: eostools.py:511
Definition: Run.h:45
std::shared_ptr< lhef::LHEEvent > partonLevel_
#define LogDebug(id)
void endRun(edm::Run const &, edm::EventSetup const &) override