CMS 3D CMS Logo

ExternalLHEProducer.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: ExternalLHEProducer
4 // Class: ExternalLHEProducer
5 //
13 //
14 // Original Author: Brian Paul Bockelman,8 R-018,+41227670861,
15 // Created: Fri Oct 21 11:37:26 CEST 2011
16 //
17 //
18 
19 // system include files
20 #include "oneapi/tbb/task_arena.h"
21 #include "oneapi/tbb/task_group.h"
22 #include <cstdio>
23 #include <cstdlib>
24 #include <dirent.h>
25 #include <fcntl.h>
26 #include <filesystem>
27 #include <fstream>
28 #include <memory>
29 #include <string>
30 #include <sys/resource.h>
31 #include <sys/time.h>
32 #include <sys/wait.h>
33 #include <system_error>
34 #include <unistd.h>
35 #include <vector>
36 
37 #include "boost/ptr_container/ptr_deque.hpp"
38 
39 // user include files
43 
47 
50 
55 
59 
62 
64 
65 //
66 // class declaration
67 //
68 
69 class ExternalLHEProducer : public edm::one::EDProducer<edm::BeginRunProducer, edm::one::WatchRuns> {
70 public:
71  explicit ExternalLHEProducer(const edm::ParameterSet& iConfig);
72 
73  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
74 
75 private:
76  void produce(edm::Event&, const edm::EventSetup&) override;
77  void beginRunProduce(edm::Run& run, edm::EventSetup const& es) override;
78  void beginRun(edm::Run const&, edm::EventSetup const&) override;
79  void endRun(edm::Run const&, edm::EventSetup const&) override;
80  void preallocThreads(unsigned int) override;
81 
82  std::vector<std::string> makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const;
83  int closeDescriptors(int preserve) const;
84  void executeScript(std::vector<std::string> const& args, int id, bool isPost) const;
85 
86  void nextEvent();
87  std::unique_ptr<LHERunInfoProduct> generateRunInfo(std::vector<std::string> const& files) const;
88 
89  // ----------member data ---------------------------
92  const std::vector<std::string> args_;
93  uint32_t npars_;
94  uint32_t nEvents_;
95  bool storeXML_;
96  unsigned int nThreads_{1};
98  bool generateConcurrently_{false};
99  const std::vector<std::string> postGenerationCommand_;
100 
101  // Used only if nPartonMapping is in the configuration
102  std::map<unsigned, std::pair<unsigned, unsigned>> nPartonMapping_{};
103 
104  std::unique_ptr<lhef::LHEReader> reader_;
105  std::shared_ptr<lhef::LHEEvent> partonLevel_;
107 
112  public:
113  explicit FileCloseSentry(int fd) : fd_(fd){};
114 
115  ~FileCloseSentry() { close(fd_); }
116 
117  //Make this noncopyable
118  FileCloseSentry(const FileCloseSentry&) = delete;
119  FileCloseSentry& operator=(const FileCloseSentry&) = delete;
120 
121  private:
122  int fd_;
123  };
124 };
125 
126 //
127 // constructors and destructor
128 //
130  : scriptName_((iConfig.getParameter<edm::FileInPath>("scriptName")).fullPath()),
131  outputFile_(iConfig.getParameter<std::string>("outputFile")),
132  args_(iConfig.getParameter<std::vector<std::string>>("args")),
133  npars_(iConfig.getParameter<uint32_t>("numberOfParameters")),
134  nEvents_(iConfig.getUntrackedParameter<uint32_t>("nEvents")),
135  storeXML_(iConfig.getUntrackedParameter<bool>("storeXML")),
136  generateConcurrently_(iConfig.getUntrackedParameter<bool>("generateConcurrently")),
137  postGenerationCommand_(iConfig.getUntrackedParameter<std::vector<std::string>>("postGenerationCommand")) {
138  if (npars_ != args_.size())
139  throw cms::Exception("ExternalLHEProducer")
140  << "Problem with configuration: " << args_.size() << " script arguments given, expected " << npars_;
141 
142  if (iConfig.exists("nPartonMapping")) {
143  auto& processMap(iConfig.getParameterSetVector("nPartonMapping"));
144  for (auto& cfg : processMap) {
145  unsigned processId(cfg.getParameter<unsigned>("idprup"));
146 
147  auto orderStr(cfg.getParameter<std::string>("order"));
148  unsigned order(0);
149  if (orderStr == "LO")
150  order = 0;
151  else if (orderStr == "NLO")
152  order = 1;
153  else
154  throw cms::Exception("ExternalLHEProducer")
155  << "Invalid order specification for process " << processId << ": " << orderStr;
156 
157  unsigned np(cfg.getParameter<unsigned>("np"));
158 
159  nPartonMapping_.emplace(processId, std::make_pair(order, np));
160  }
161  }
162 
163  xmlPutToken_ = produces<LHEXMLStringProduct, edm::Transition::BeginRun>("LHEScriptOutput");
164 
165  eventPutToken_ = produces<LHEEventProduct>();
166  beginRunPutToken_ = produces<LHERunInfoProduct, edm::Transition::BeginRun>();
167 }
168 
169 //
170 // member functions
171 //
172 
173 // ------------ method called with number of threads in job --
174 void ExternalLHEProducer::preallocThreads(unsigned int iThreads) { nThreads_ = iThreads; }
175 
176 // ------------ method called to produce the data ------------
178  nextEvent();
179  if (!partonLevel_) {
181  << "No lhe event found in ExternalLHEProducer::produce(). "
182  << "The likely cause is that the lhe file contains fewer events than were requested, which is possible "
183  << "in case of phase space integration or uneweighting efficiency problems.";
184  }
185 
186  std::unique_ptr<LHEEventProduct> product(
187  new LHEEventProduct(*partonLevel_->getHEPEUP(), partonLevel_->originalXWGTUP()));
188  if (partonLevel_->getPDF()) {
189  product->setPDF(*partonLevel_->getPDF());
190  }
191  std::for_each(partonLevel_->weights().begin(),
192  partonLevel_->weights().end(),
193  std::bind(&LHEEventProduct::addWeight, product.get(), std::placeholders::_1));
194  product->setScales(partonLevel_->scales());
195  if (nPartonMapping_.empty()) {
196  product->setNpLO(partonLevel_->npLO());
197  product->setNpNLO(partonLevel_->npNLO());
198  } else {
199  // overwrite npLO and npNLO values by user-specified mapping
200  unsigned processId(partonLevel_->getHEPEUP()->IDPRUP);
201  unsigned order(0);
202  unsigned np(0);
203  try {
204  auto procDef(nPartonMapping_.at(processId));
205  order = procDef.first;
206  np = procDef.second;
207  } catch (std::out_of_range&) {
208  throw cms::Exception("ExternalLHEProducer")
209  << "Unexpected IDPRUP encountered: " << partonLevel_->getHEPEUP()->IDPRUP;
210  }
211 
212  switch (order) {
213  case 0:
214  product->setNpLO(np);
215  product->setNpNLO(-1);
216  break;
217  case 1:
218  product->setNpLO(-1);
219  product->setNpNLO(np);
220  break;
221  default:
222  break;
223  }
224  }
225 
226  std::for_each(partonLevel_->getComments().begin(),
227  partonLevel_->getComments().end(),
228  std::bind(&LHEEventProduct::addComment, product.get(), std::placeholders::_1));
229 
230  iEvent.put(eventPutToken_, std::move(product));
231 
232  partonLevel_.reset();
233  return;
234 }
235 
236 // ------------ method called when starting to processes a run ------------
238  // pass the number of events as previous to last argument
239 
240  // pass the random number generator seed as last argument
241 
243 
244  if (!rng.isAvailable()) {
245  throw cms::Exception("Configuration")
246  << "The ExternalLHEProducer module requires the RandomNumberGeneratorService\n"
247  "which is not present in the configuration file. You must add the service\n"
248  "in the configuration file if you want to run ExternalLHEProducer";
249  }
250 
251  std::vector<std::string> infiles;
252  auto const seed = rng->mySeed();
253  if (generateConcurrently_) {
254  infiles.resize(nThreads_);
255  auto const nEventsAve = nEvents_ / nThreads_;
256  unsigned int const overflow = nThreads_ - (nEvents_ % nThreads_);
257  std::exception_ptr except;
258  std::atomic<char> exceptSet{0};
259 
260  tbb::this_task_arena::isolate([this, &except, &infiles, &exceptSet, nEventsAve, overflow, seed]() {
261  tbb::task_group group;
262  for (unsigned int t = 0; t < nThreads_; ++t) {
263  uint32_t nEvents = nEventsAve;
264  if (nEvents_ % nThreads_ != 0 and t >= overflow) {
265  nEvents += 1;
266  }
267  group.run([t, this, &infiles, seed, nEvents, &except, &exceptSet]() {
268  CMS_SA_ALLOW try {
269  using namespace std::filesystem;
270  using namespace std::string_literals;
271  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
272  infiles[t] = out.native();
273  executeScript(makeArgs(nEvents, 1, seed + t), t, false);
274  } catch (...) {
275  char expected = 0;
276  if (exceptSet.compare_exchange_strong(expected, 1)) {
277  except = std::current_exception();
278  exceptSet.store(2);
279  }
280  }
281  });
282  }
283  group.wait();
284  });
285  if (exceptSet != 0) {
286  std::rethrow_exception(except);
287  }
288  } else {
289  infiles = std::vector<std::string>(1, outputFile_);
291  }
292 
293  //run post-generation command if specified
294  if (!postGenerationCommand_.empty()) {
295  std::vector<std::string> postcmd = postGenerationCommand_;
296  try {
297  postcmd[0] = edm::FileInPath(postcmd[0]).fullPath();
298  } catch (const edm::Exception& e) {
299  edm::LogWarning("ExternalLHEProducer") << postcmd[0] << " is not a relative path. Run it as a shell command.";
300  }
301  executeScript(postcmd, 0, true);
302  }
303 
304  //fill LHEXMLProduct (streaming read directly into compressed buffer to save memory)
305  std::unique_ptr<LHEXMLStringProduct> p(new LHEXMLStringProduct);
306 
307  //store the XML file only if explictly requested
308  if (storeXML_) {
310  if (generateConcurrently_) {
311  using namespace std::filesystem;
312  file = (path("thread0") / path(outputFile_)).native();
313  } else {
314  file = outputFile_;
315  }
316  std::ifstream instream(file);
317  if (!instream) {
318  throw cms::Exception("OutputOpenError") << "Unable to open script output file " << outputFile_ << ".";
319  }
320  instream.seekg(0, instream.end);
321  int insize = instream.tellg();
322  instream.seekg(0, instream.beg);
323  p->fillCompressedContent(instream, 0.25 * insize);
324  instream.close();
325  }
326  run.put(xmlPutToken_, std::move(p));
327 
328  //Read the beginning of each file to get the run info in order to do the merge
329  auto runInfo = generateRunInfo(infiles);
330  if (runInfo) {
332  }
333 
334  // LHE C++ classes translation
335  // (read back uncompressed file from disk in streaming mode again to save memory)
336  unsigned int skip = 0;
337  reader_ = std::make_unique<lhef::LHEReader>(infiles, skip);
338 
339  nextEvent();
340 }
341 
343 // ------------ method called when ending the processing of a run ------------
345  nextEvent();
346  if (partonLevel_) {
347  // VALIDATION_RUN env variable allows to finish event processing early without errors by sending SIGINT
348  if (std::getenv("VALIDATION_RUN") != nullptr) {
349  edm::LogWarning("ExternalLHEProducer")
350  << "Event loop is over, but there are still lhe events to process, ignoring...";
351  } else {
353  << "Error in ExternalLHEProducer::endRunProduce(). "
354  << "Event loop is over, but there are still lhe events to process."
355  << "This could happen if lhe file contains more events than requested. This is never expected to happen.";
356  }
357  }
358 
359  reader_.reset();
360  if (generateConcurrently_) {
361  for (unsigned int t = 0; t < nThreads_; ++t) {
362  using namespace std::filesystem;
363  using namespace std::string_literals;
364  auto out = path("thread"s + std::to_string(t)) / path(outputFile_);
365  if (unlink(out.c_str())) {
366  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << out
367  << " (errno=" << errno << ", " << strerror(errno) << ").";
368  }
369  }
370  } else {
371  if (unlink(outputFile_.c_str())) {
372  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << outputFile_
373  << " (errno=" << errno << ", " << strerror(errno) << ").";
374  }
375  }
376 }
377 
378 std::vector<std::string> ExternalLHEProducer::makeArgs(uint32_t nEvents,
379  unsigned int nThreads,
380  std::uint32_t seed) const {
381  std::vector<std::string> args;
382  args.reserve(3 + args_.size());
383 
384  args.push_back(args_.front());
385  args.push_back(std::to_string(nEvents));
386 
387  args.push_back(std::to_string(seed));
388 
389  args.push_back(std::to_string(nThreads));
390  std::copy(args_.begin() + 1, args_.end(), std::back_inserter(args));
391 
392  for (unsigned int iArg = 0; iArg < args.size(); iArg++) {
393  LogDebug("LHEInputArgs") << "arg [" << iArg << "] = " << args[iArg];
394  }
395 
396  return args;
397 }
398 
399 // ------------ Close all the open file descriptors ------------
400 int ExternalLHEProducer::closeDescriptors(int preserve) const {
401  int maxfd = 1024;
402  int fd;
403 #ifdef __linux__
404  DIR* dir;
405  struct dirent* dp;
406  maxfd = preserve;
407  if ((dir = opendir("/proc/self/fd"))) {
408  errno = 0;
409  while ((dp = readdir(dir)) != nullptr) {
410  if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) {
411  continue;
412  }
413  if (sscanf(dp->d_name, "%d", &fd) != 1) {
414  //throw cms::Exception("closeDescriptors") << "Found unexpected filename in /proc/self/fd: " << dp->d_name;
415  return -1;
416  }
417  if (fd > maxfd) {
418  maxfd = fd;
419  }
420  }
421  if (errno) {
422  //throw cms::Exception("closeDescriptors") << "Unable to determine the number of fd (errno=" << errno << ", " << strerror(errno) << ").";
423  return errno;
424  }
425  closedir(dir);
426  }
427 #endif
428  // TODO: assert for an unreasonable number of fds?
429  for (fd = 3; fd < maxfd + 1; fd++) {
430  if (fd != preserve)
431  close(fd);
432  }
433  return 0;
434 }
435 
436 // ------------ Execute the script associated with this producer ------------
437 void ExternalLHEProducer::executeScript(std::vector<std::string> const& args, int id, bool isPost) const {
438  // Fork a script, wait until it finishes.
439 
440  int rc = 0, rc2 = 0;
441  int filedes[2], fd_flags;
442 
443  if (pipe(filedes)) {
444  throw cms::Exception("Unable to create a new pipe");
445  }
446  FileCloseSentry sentry1(filedes[0]), sentry2(filedes[1]);
447 
448  if ((fd_flags = fcntl(filedes[1], F_GETFD, NULL)) == -1) {
449  throw cms::Exception("ExternalLHEProducer")
450  << "Failed to get pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
451  }
452  if (fcntl(filedes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
453  throw cms::Exception("ExternalLHEProducer")
454  << "Failed to set pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
455  }
456 
457  unsigned int argc_pre = 0;
458  // For generation command the first argument gives to the scriptName
459  if (!isPost) {
460  argc_pre = 1;
461  }
462  unsigned int argc = argc_pre + args.size();
463  // TODO: assert that we have a reasonable number of arguments
464  char** argv = new char*[argc + 1];
465  if (!isPost) {
466  argv[0] = strdup(scriptName_.c_str());
467  }
468  for (unsigned int i = 0; i < args.size(); i++) {
469  argv[argc_pre + i] = strdup(args[i].c_str());
470  }
471  argv[argc] = nullptr;
472 
473  pid_t pid = fork();
474  if (pid == 0) {
475  // The child process
476  if (!(rc = closeDescriptors(filedes[1]))) {
477  if (!isPost && generateConcurrently_) {
478  using namespace std::filesystem;
479  using namespace std::string_literals;
480  std::error_code ec;
481  auto newDir = path("thread"s + std::to_string(id));
482  create_directory(newDir, ec);
483  current_path(newDir, ec);
484  }
485  execvp(argv[0], argv); // If execv returns, we have an error.
486  rc = errno;
487  }
488  while ((write(filedes[1], &rc, sizeof(int)) == -1) && (errno == EINTR)) {
489  }
490  _exit(1);
491  }
492 
493  // Free the arg vector ASAP
494  for (unsigned int i = 0; i < args.size() + 1; i++) {
495  free(argv[i]);
496  }
497  delete[] argv;
498 
499  if (pid == -1) {
500  throw cms::Exception("ForkException")
501  << "Unable to fork a child (errno=" << errno << ", " << strerror(errno) << ")";
502  }
503 
504  close(filedes[1]);
505  // If the exec succeeds, the read will fail.
506  while (((rc2 = read(filedes[0], &rc, sizeof(int))) == -1) && (errno == EINTR)) {
507  rc2 = 0;
508  }
509  if ((rc2 == sizeof(int)) && rc) {
510  throw cms::Exception("ExternalLHEProducer")
511  << "Failed to execute script (errno=" << rc << ", " << strerror(rc) << ")";
512  }
513  close(filedes[0]);
514 
515  int status = 0;
516  errno = 0;
517  do {
518  if (waitpid(pid, &status, 0) < 0) {
519  if (errno == EINTR) {
520  continue;
521  } else {
522  throw cms::Exception("ExternalLHEProducer")
523  << "Failed to read child status (errno=" << errno << ", " << strerror(errno) << ")";
524  }
525  }
526  if (WIFSIGNALED(status)) {
527  throw cms::Exception("ExternalLHEProducer") << "Child exited due to signal " << WTERMSIG(status) << ".";
528  }
529  if (WIFEXITED(status)) {
530  rc = WEXITSTATUS(status);
531  break;
532  }
533  } while (true);
534  if (rc) {
535  throw cms::Exception("ExternalLHEProducer") << "Child failed with exit code " << rc << ".";
536  }
537 }
538 
539 // ------------ method fills 'descriptions' with the allowed parameters for the module ------------
541  //The following says we do not know what parameters are allowed so do no validation
542  // Please change this to state exactly what you do use, even if it is no parameters
544  desc.setComment("Executes an external script and places its output file into an EDM collection");
545 
546  edm::FileInPath thePath;
547  desc.add<edm::FileInPath>("scriptName", thePath);
548  desc.add<std::string>("outputFile", "myoutput");
549  desc.add<std::vector<std::string>>("args");
550  desc.add<uint32_t>("numberOfParameters");
551  desc.addUntracked<uint32_t>("nEvents");
552  desc.addUntracked<bool>("storeXML", false);
553  desc.addUntracked<bool>("generateConcurrently", false)
554  ->setComment("If true, run the script concurrently in separate processes.");
555  desc.addUntracked<std::vector<std::string>>("postGenerationCommand", std::vector<std::string>())
556  ->setComment(
557  "Command to run after the generation script has completed. The first argument can be a relative path.");
558 
559  edm::ParameterSetDescription nPartonMappingDesc;
560  nPartonMappingDesc.add<unsigned>("idprup");
561  nPartonMappingDesc.add<std::string>("order");
562  nPartonMappingDesc.add<unsigned>("np");
563  desc.addVPSetOptional("nPartonMapping", nPartonMappingDesc);
564 
565  descriptions.addDefault(desc);
566 }
567 
568 std::unique_ptr<LHERunInfoProduct> ExternalLHEProducer::generateRunInfo(std::vector<std::string> const& iFiles) const {
569  std::unique_ptr<LHERunInfoProduct> retValue;
570  //read each file in turn and only get the header info
571  for (auto const& file : iFiles) {
572  unsigned int skip = 0;
573  std::vector<std::string> infiles(1, file);
574  auto reader = std::make_unique<lhef::LHEReader>(infiles, skip);
575  auto parton = reader->next();
576  if (!parton) {
577  break;
578  }
579  auto runInfo = parton->getRunInfo();
580  LHERunInfoProduct product(*runInfo->getHEPRUP());
581 
582  std::for_each(runInfo->getHeaders().begin(),
583  runInfo->getHeaders().end(),
584  std::bind(&LHERunInfoProduct::addHeader, &product, std::placeholders::_1));
585  std::for_each(runInfo->getComments().begin(),
586  runInfo->getComments().end(),
587  std::bind(&LHERunInfoProduct::addComment, &product, std::placeholders::_1));
588  if (not retValue) {
589  retValue = std::make_unique<LHERunInfoProduct>(std::move(product));
590  } else {
591  retValue->mergeProduct(product);
592  }
593  }
594 
595  return retValue;
596 }
597 
599  if (partonLevel_)
600  return;
601 
602  if (not reader_) {
603  return;
604  }
605 
606  partonLevel_ = reader_->next();
607  if (!partonLevel_) {
608  //see if we have another file to read;
609  bool newFileOpened;
610  do {
611  newFileOpened = false;
612  partonLevel_ = reader_->next(&newFileOpened);
613  } while (newFileOpened && !partonLevel_);
614  }
615 }
616 
617 //define this as a plug-in
void beginRunProduce(edm::Run &run, edm::EventSetup const &es) override
#define CMS_SA_ALLOW
void addHeader(const Header &header)
std::string fullPath() const
Definition: FileInPath.cc:161
void addComment(const std::string &line)
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
void produce(edm::Event &, const edm::EventSetup &) override
bool exists(std::string const &parameterName) const
checks if a parameter exists
void addWeight(const WGT &wgt)
std::string to_string(const V &value)
Definition: OMSAccess.h:71
#define NULL
Definition: scimark2.h:8
reader
Definition: DQM.py:105
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
edm::EDPutTokenT< LHERunInfoProduct > beginRunPutToken_
edm::EDPutTokenT< LHEEventProduct > eventPutToken_
std::unique_ptr< LHERunInfoProduct > generateRunInfo(std::vector< std::string > const &files) const
int iEvent
Definition: GenABIO.cc:224
void addDefault(ParameterSetDescription const &psetDescription)
virtual std::uint32_t mySeed() const =0
int np
Definition: AMPTWrapper.h:43
edm::EDPutTokenT< LHEXMLStringProduct > xmlPutToken_
std::unique_ptr< lhef::LHEReader > reader_
std::map< unsigned, std::pair< unsigned, unsigned > > nPartonMapping_
def pipe(cmdline, input=None)
Definition: pipe.py:5
ParameterDescriptionBase * add(U const &iLabel, T const &value)
int closeDescriptors(int preserve) const
void executeScript(std::vector< std::string > const &args, int id, bool isPost) const
const std::vector< std::string > args_
std::vector< std::string > makeArgs(uint32_t nEvents, unsigned int nThreads, std::uint32_t seed) const
const std::vector< std::string > postGenerationCommand_
void addComment(const std::string &line)
void beginRun(edm::Run const &, edm::EventSetup const &) override
void preallocThreads(unsigned int) override
bool mergeProduct(const LHERunInfoProduct &other)
HLT enums.
VParameterSet const & getParameterSetVector(std::string const &name) const
FileCloseSentry & operator=(const FileCloseSentry &)=delete
bool isAvailable() const
Definition: Service.h:40
Log< level::Warning, false > LogWarning
fd
Definition: ztee.py:136
ExternalLHEProducer(const edm::ParameterSet &iConfig)
def move(src, dest)
Definition: eostools.py:511
Definition: Run.h:45
std::shared_ptr< lhef::LHEEvent > partonLevel_
#define LogDebug(id)
void endRun(edm::Run const &, edm::EventSetup const &) override