CMS 3D CMS Logo

ExternalLHEProducer.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: ExternalLHEProducer
4 // Class: ExternalLHEProducer
5 //
13 //
14 // Original Author: Brian Paul Bockelman,8 R-018,+41227670861,
15 // Created: Fri Oct 21 11:37:26 CEST 2011
16 //
17 //
18 
19 
20 // system include files
21 #include <cstdio>
22 #include <memory>
23 #include <vector>
24 #include <string>
25 #include <fstream>
26 #include <unistd.h>
27 #include <dirent.h>
28 #include <fcntl.h>
29 #include <sys/wait.h>
30 #include <sys/time.h>
31 #include <sys/resource.h>
32 
33 
34 #include "boost/bind.hpp"
35 #include "boost/shared_ptr.hpp"
36 #include "boost/ptr_container/ptr_deque.hpp"
37 
38 // user include files
42 
46 
49 
54 
58 
62 
64 
65 //
66 // class declaration
67 //
68 
69 class ExternalLHEProducer : public edm::one::EDProducer<edm::BeginRunProducer,
70  edm::EndRunProducer> {
71 public:
72  explicit ExternalLHEProducer(const edm::ParameterSet& iConfig);
73  ~ExternalLHEProducer() override;
74 
75  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
76 
77 private:
78 
79  void produce(edm::Event&, const edm::EventSetup&) override;
80  void beginRunProduce(edm::Run& run, edm::EventSetup const& es) override;
81  void endRunProduce(edm::Run&, edm::EventSetup const&) override;
82  void preallocThreads(unsigned int) override;
83 
84  int closeDescriptors(int preserve);
85  void executeScript();
86  std::unique_ptr<std::string> readOutput();
87 
88  void nextEvent();
89 
90  // ----------member data ---------------------------
93  std::vector<std::string> args_;
94  uint32_t npars_;
95  uint32_t nEvents_;
96  bool storeXML_;
97  unsigned int nThreads_{1};
99 
100  std::unique_ptr<lhef::LHEReader> reader_;
101  boost::shared_ptr<lhef::LHERunInfo> runInfoLast;
102  boost::shared_ptr<lhef::LHERunInfo> runInfo;
103  boost::shared_ptr<lhef::LHEEvent> partonLevel;
104  boost::ptr_deque<LHERunInfoProduct> runInfoProducts;
105  bool wasMerged;
106 
107  class FileCloseSentry : private boost::noncopyable {
108  public:
109  explicit FileCloseSentry(int fd) : fd_(fd) {};
110 
112  close(fd_);
113  }
114  private:
115  int fd_;
116  };
117 
118 };
119 
120 //
121 // constants, enums and typedefs
122 //
123 
124 
125 //
126 // static data member definitions
127 //
128 
129 //
130 // constructors and destructor
131 //
133  scriptName_((iConfig.getParameter<edm::FileInPath>("scriptName")).fullPath()),
134  outputFile_(iConfig.getParameter<std::string>("outputFile")),
135  args_(iConfig.getParameter<std::vector<std::string> >("args")),
136  npars_(iConfig.getParameter<uint32_t>("numberOfParameters")),
137  nEvents_(iConfig.getUntrackedParameter<uint32_t>("nEvents")),
138  storeXML_(iConfig.getUntrackedParameter<bool>("storeXML"))
139 {
140  if (npars_ != args_.size())
141  throw cms::Exception("ExternalLHEProducer") << "Problem with configuration: " << args_.size() << " script arguments given, expected " << npars_;
142  produces<LHEXMLStringProduct, edm::Transition::BeginRun>("LHEScriptOutput");
143 
144  produces<LHEEventProduct>();
145  produces<LHERunInfoProduct, edm::Transition::BeginRun>();
146  produces<LHERunInfoProduct, edm::Transition::EndRun>();
147 }
148 
149 
151 {
152 }
153 
154 
155 //
156 // member functions
157 //
158 
159 // ------------ method called with number of threads in job --
160 void
162 {
163  nThreads_ = iThreads;
164 }
165 
166 // ------------ method called to produce the data ------------
167 void
169 {
170  nextEvent();
171  if (!partonLevel) {
172  throw cms::Exception("ExternalLHEProducer") << "No lhe event found in ExternalLHEProducer::produce(). "
173  << "The likely cause is that the lhe file contains fewer events than were requested, which is possible "
174  << "in case of phase space integration or uneweighting efficiency problems.";
175  }
176 
177  std::unique_ptr<LHEEventProduct> product(
178  new LHEEventProduct(*partonLevel->getHEPEUP(),
179  partonLevel->originalXWGTUP())
180  );
181  if (partonLevel->getPDF()) {
182  product->setPDF(*partonLevel->getPDF());
183  }
184  std::for_each(partonLevel->weights().begin(),
185  partonLevel->weights().end(),
186  boost::bind(&LHEEventProduct::addWeight,
187  product.get(), _1));
188  product->setScales(partonLevel->scales());
189  product->setNpLO(partonLevel->npLO());
190  product->setNpNLO(partonLevel->npNLO());
191  std::for_each(partonLevel->getComments().begin(),
192  partonLevel->getComments().end(),
193  boost::bind(&LHEEventProduct::addComment,
194  product.get(), _1));
195 
196  iEvent.put(std::move(product));
197 
198  if (runInfo) {
199  std::auto_ptr<LHERunInfoProduct> product(new LHERunInfoProduct(*runInfo->getHEPRUP()));
200  std::for_each(runInfo->getHeaders().begin(),
201  runInfo->getHeaders().end(),
202  boost::bind(&LHERunInfoProduct::addHeader,
203  product.get(), _1));
204  std::for_each(runInfo->getComments().begin(),
205  runInfo->getComments().end(),
206  boost::bind(&LHERunInfoProduct::addComment,
207  product.get(), _1));
208 
209  if (!runInfoProducts.empty()) {
210  runInfoProducts.front().mergeProduct(*product);
211  if (!wasMerged) {
212  runInfoProducts.pop_front();
213  runInfoProducts.push_front(product);
214  wasMerged = true;
215  }
216  }
217 
218  runInfo.reset();
219  }
220 
221  partonLevel.reset();
222  return;
223 }
224 
225 // ------------ method called when starting to processes a run ------------
226 void
228 {
229 
230  // pass the number of events as previous to last argument
231 
232  std::ostringstream eventStream;
233  eventStream << nEvents_;
234  // args_.push_back(eventStream.str());
235  args_.insert(args_.begin() + 1, eventStream.str());
236 
237  // pass the random number generator seed as last argument
238 
240 
241  if ( ! rng.isAvailable()) {
242  throw cms::Exception("Configuration")
243  << "The ExternalLHEProducer module requires the RandomNumberGeneratorService\n"
244  "which is not present in the configuration file. You must add the service\n"
245  "in the configuration file if you want to run ExternalLHEProducer";
246  }
247  std::ostringstream randomStream;
248  randomStream << rng->mySeed();
249  // args_.push_back(randomStream.str());
250  args_.insert(args_.begin() + 2, randomStream.str());
251 
252  // args_.emplace_back(std::to_string(nThreads_));
253  args_.insert(args_.begin() + 3, std::to_string(nThreads_));
254 
255  for ( unsigned int iArg = 0; iArg < args_.size() ; iArg++ ) {
256  LogDebug("LHEInputArgs") << "arg [" << iArg << "] = " << args_[iArg];
257  }
258 
259  executeScript();
260 
261  //fill LHEXMLProduct (streaming read directly into compressed buffer to save memory)
262  std::unique_ptr<LHEXMLStringProduct> p(new LHEXMLStringProduct);
263 
264  //store the XML file only if explictly requested
265  if (storeXML_) {
266  std::ifstream instream(outputFile_);
267  if (!instream) {
268  throw cms::Exception("OutputOpenError") << "Unable to open script output file " << outputFile_ << ".";
269  }
270  instream.seekg (0, instream.end);
271  int insize = instream.tellg();
272  instream.seekg (0, instream.beg);
273  p->fillCompressedContent(instream, 0.25*insize);
274  instream.close();
275  }
276  run.put(std::move(p), "LHEScriptOutput");
277 
278  // LHE C++ classes translation
279  // (read back uncompressed file from disk in streaming mode again to save memory)
280 
281  std::vector<std::string> infiles(1, outputFile_);
282  unsigned int skip = 0;
283  reader_ = std::make_unique<lhef::LHEReader>(infiles, skip);
284 
285  nextEvent();
286  if (runInfoLast) {
288 
289  std::unique_ptr<LHERunInfoProduct> product(new LHERunInfoProduct(*runInfo->getHEPRUP()));
290  std::for_each(runInfo->getHeaders().begin(),
291  runInfo->getHeaders().end(),
292  boost::bind(&LHERunInfoProduct::addHeader,
293  product.get(), _1));
294  std::for_each(runInfo->getComments().begin(),
295  runInfo->getComments().end(),
296  boost::bind(&LHERunInfoProduct::addComment,
297  product.get(), _1));
298 
299  // keep a copy around in case of merging
300  runInfoProducts.push_back(new LHERunInfoProduct(*product));
301  wasMerged = false;
302 
303  run.put(std::move(product));
304 
305  runInfo.reset();
306  }
307 
308 }
309 
310 // ------------ method called when ending the processing of a run ------------
311 void
313 {
314 
315  if (!runInfoProducts.empty()) {
316  std::unique_ptr<LHERunInfoProduct> product(runInfoProducts.pop_front().release());
317  run.put(std::move(product));
318  }
319 
320  nextEvent();
321  if (partonLevel) {
322  throw cms::Exception("ExternalLHEProducer") << "Error in ExternalLHEProducer::endRunProduce(). "
323  << "Event loop is over, but there are still lhe events to process."
324  << "This could happen if lhe file contains more events than requested. This is never expected to happen.";
325  }
326 
327  reader_.reset();
328 
329  if (unlink(outputFile_.c_str())) {
330  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << outputFile_ << " (errno=" << errno << ", " << strerror(errno) << ").";
331  }
332 
333 }
334 
335 // ------------ Close all the open file descriptors ------------
336 int
338 {
339  int maxfd = 1024;
340  int fd;
341 #ifdef __linux__
342  DIR * dir;
343  struct dirent *dp;
344  maxfd = preserve;
345  if ((dir = opendir("/proc/self/fd"))) {
346  errno = 0;
347  while ((dp = readdir (dir)) != nullptr) {
348  if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) {
349  continue;
350  }
351  if (sscanf(dp->d_name, "%d", &fd) != 1) {
352  //throw cms::Exception("closeDescriptors") << "Found unexpected filename in /proc/self/fd: " << dp->d_name;
353  return -1;
354  }
355  if (fd > maxfd) {
356  maxfd = fd;
357  }
358  }
359  if (errno) {
360  //throw cms::Exception("closeDescriptors") << "Unable to determine the number of fd (errno=" << errno << ", " << strerror(errno) << ").";
361  return errno;
362  }
363  closedir(dir);
364  }
365 #endif
366  // TODO: assert for an unreasonable number of fds?
367  for (fd=3; fd<maxfd+1; fd++) {
368  if (fd != preserve)
369  close(fd);
370  }
371  return 0;
372 }
373 
374 // ------------ Execute the script associated with this producer ------------
375 void
377 {
378 
379  // Fork a script, wait until it finishes.
380 
381  int rc = 0, rc2 = 0;
382  int filedes[2], fd_flags;
383  unsigned int argc;
384 
385  if (pipe(filedes)) {
386  throw cms::Exception("Unable to create a new pipe");
387  }
388  FileCloseSentry sentry1(filedes[0]), sentry2(filedes[1]);
389 
390  if ((fd_flags = fcntl(filedes[1], F_GETFD, NULL)) == -1) {
391  throw cms::Exception("ExternalLHEProducer") << "Failed to get pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
392  }
393  if (fcntl(filedes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
394  throw cms::Exception("ExternalLHEProducer") << "Failed to set pipe file descriptor flags (errno=" << rc << ", " << strerror(rc) << ")";
395  }
396 
397  argc = 1 + args_.size();
398  // TODO: assert that we have a reasonable number of arguments
399  char **argv = new char *[argc+1];
400  argv[0] = strdup(scriptName_.c_str());
401  for (unsigned int i=1; i<argc; i++) {
402  argv[i] = strdup(args_[i-1].c_str());
403  }
404  argv[argc] = nullptr;
405 
406  pid_t pid = fork();
407  if (pid == 0) {
408  // The child process
409  if (!(rc = closeDescriptors(filedes[1]))) {
410  execvp(argv[0], argv); // If execv returns, we have an error.
411  rc = errno;
412  }
413  while ((write(filedes[1], &rc, sizeof(int)) == -1) && (errno == EINTR)) {}
414  _exit(1);
415  }
416 
417  // Free the arg vector ASAP
418  for (unsigned int i=0; i<args_.size()+1; i++) {
419  free(argv[i]);
420  }
421  delete [] argv;
422 
423  if (pid == -1) {
424  throw cms::Exception("ForkException") << "Unable to fork a child (errno=" << errno << ", " << strerror(errno) << ")";
425  }
426 
427  close(filedes[1]);
428  // If the exec succeeds, the read will fail.
429  while (((rc2 = read(filedes[0], &rc, sizeof(int))) == -1) && (errno == EINTR)) { rc2 = 0; }
430  if ((rc2 == sizeof(int)) && rc) {
431  throw cms::Exception("ExternalLHEProducer") << "Failed to execute script (errno=" << rc << ", " << strerror(rc) << ")";
432  }
433  close(filedes[0]);
434 
435  int status = 0;
436  errno = 0;
437  do {
438  if (waitpid(pid, &status, 0) < 0) {
439  if (errno == EINTR) {
440  continue;
441  } else {
442  throw cms::Exception("ExternalLHEProducer") << "Failed to read child status (errno=" << errno << ", " << strerror(errno) << ")";
443  }
444  }
445  if (WIFSIGNALED(status)) {
446  throw cms::Exception("ExternalLHEProducer") << "Child exited due to signal " << WTERMSIG(status) << ".";
447  }
448  if (WIFEXITED(status)) {
449  rc = WEXITSTATUS(status);
450  break;
451  }
452  } while (true);
454  if(ts.isAvailable()) {
455  struct rusage ru;
456  getrusage(RUSAGE_CHILDREN,&ru);
457  double time = static_cast<double>(ru.ru_stime.tv_sec) + (static_cast<double>(ru.ru_stime.tv_usec) * 1E-6);
458  ts->addToCPUTime(time);
459  }
460  if (rc) {
461  throw cms::Exception("ExternalLHEProducer") << "Child failed with exit code " << rc << ".";
462  }
463 
464 }
465 
466 // ------------ Read the output script ------------
467 #define BUFSIZE 4096
468 std::unique_ptr<std::string> ExternalLHEProducer::readOutput()
469 {
470  int fd;
471  ssize_t n;
472  char buf[BUFSIZE];
473 
474  if ((fd = open(outputFile_.c_str(), O_RDONLY)) == -1) {
475  throw cms::Exception("OutputOpenError") << "Unable to open script output file " << outputFile_ << " (errno=" << errno << ", " << strerror(errno) << ").";
476  }
477 
478  std::stringstream ss;
479  while ((n = read(fd, buf, BUFSIZE)) > 0 || (n == -1 && errno == EINTR)) {
480  if (n > 0)
481  ss.write(buf, n);
482  }
483  if (n == -1) {
484  throw cms::Exception("OutputOpenError") << "Unable to read from script output file " << outputFile_ << " (errno=" << errno << ", " << strerror(errno) << ").";
485  }
486 
487  if (unlink(outputFile_.c_str())) {
488  throw cms::Exception("OutputDeleteError") << "Unable to delete original script output file " << outputFile_ << " (errno=" << errno << ", " << strerror(errno) << ").";
489  }
490 
491  return std::unique_ptr<std::string>(new std::string(ss.str()));
492 }
493 
494 // ------------ method fills 'descriptions' with the allowed parameters for the module ------------
495 void
497  //The following says we do not know what parameters are allowed so do no validation
498  // Please change this to state exactly what you do use, even if it is no parameters
500  desc.setComment("Executes an external script and places its output file into an EDM collection");
501 
502  edm::FileInPath thePath;
503  desc.add<edm::FileInPath>("scriptName", thePath);
504  desc.add<std::string>("outputFile", "myoutput");
505  desc.add<std::vector<std::string> >("args");
506  desc.add<uint32_t>("numberOfParameters");
507  desc.addUntracked<uint32_t>("nEvents");
508  desc.addUntracked<bool>("storeXML", false);
509 
510  descriptions.addDefault(desc);
511 }
512 
514 {
515 
516  if (partonLevel)
517  return;
518 
519  if(not reader_) { return;}
520  partonLevel = reader_->next();
521  if (!partonLevel)
522  return;
523 
524  boost::shared_ptr<lhef::LHERunInfo> runInfoThis = partonLevel->getRunInfo();
525  if (runInfoThis != runInfoLast) {
526  runInfo = runInfoThis;
527  runInfoLast = runInfoThis;
528  }
529 }
530 
531 //define this as a plug-in
#define LogDebug(id)
boost::shared_ptr< lhef::LHEEvent > partonLevel
void beginRunProduce(edm::Run &run, edm::EventSetup const &es) override
void endRunProduce(edm::Run &, edm::EventSetup const &) override
OrphanHandle< PROD > put(std::unique_ptr< PROD > product)
Put a new product.
Definition: Event.h:127
std::unique_ptr< std::string > readOutput()
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void addHeader(const Header &header)
void addComment(const std::string &line)
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:17
void produce(edm::Event &, const edm::EventSetup &) override
void addWeight(const WGT &wgt)
#define NULL
Definition: scimark2.h:8
std::vector< std::string > args_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
virtual void addToCPUTime(double iTime)=0
boost::shared_ptr< lhef::LHERunInfo > runInfoLast
void setComment(std::string const &value)
int closeDescriptors(int preserve)
int iEvent
Definition: GenABIO.cc:230
void addDefault(ParameterSetDescription const &psetDescription)
std::unique_ptr< lhef::LHEReader > reader_
bool isAvailable() const
Definition: Service.h:46
#define BUFSIZE
def pipe(cmdline, input=None)
Definition: pipe.py:5
ParameterDescriptionBase * add(U const &iLabel, T const &value)
virtual std::uint32_t mySeed() const =0
boost::shared_ptr< lhef::LHERunInfo > runInfo
void addComment(const std::string &line)
auto dp
Definition: deltaR.h:22
void preallocThreads(unsigned int) override
void put(std::unique_ptr< PROD > product)
Put a new product.
Definition: Run.h:114
HLT enums.
boost::ptr_deque< LHERunInfoProduct > runInfoProducts
dbl *** dir
Definition: mlp_gen.cc:35
def write(self, setup)
ExternalLHEProducer(const edm::ParameterSet &iConfig)
def move(src, dest)
Definition: eostools.py:510
Definition: Run.h:43