CMS 3D CMS Logo

postprocessor.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from PhysicsTools.NanoAODTools.postprocessing.framework.jobreport import JobReport
3 from PhysicsTools.NanoAODTools.postprocessing.framework.preskimming import preSkim
4 from PhysicsTools.NanoAODTools.postprocessing.framework.output import FriendOutput, FullOutput
6 from PhysicsTools.NanoAODTools.postprocessing.framework.datamodel import InputTree
7 from PhysicsTools.NanoAODTools.postprocessing.framework.branchselection import BranchSelection
8 import os
9 import time
10 import hashlib
11 import subprocess
12 import ROOT
13 ROOT.PyConfig.IgnoreCommandLineOptions = True
14 
15 
17  def __init__(
18  self, outputDir, inputFiles, cut=None, branchsel=None, modules=[],
19  compression="LZMA:9", friend=False, postfix=None, jsonInput=None,
20  noOut=False, justcount=False, provenance=False, haddFileName=None,
21  fwkJobReport=False, histFileName=None, histDirName=None,
22  outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False,
23  longTermCache=False
24  ):
25  self.outputDir = outputDir
26  self.inputFiles = inputFiles
27  self.cut = cut
28  self.modules = modules
29  self.compression = compression
30  self.postfix = postfix
31  self.json = jsonInput
32  self.noOut = noOut
33  self.friend = friend
34  self.justcount = justcount
35  self.provenance = provenance
36  self.jobReport = JobReport() if fwkJobReport else None
37  self.haddFileName = haddFileName
38  self.histFile = None
39  self.histDirName = None
40  if self.jobReport and not self.haddFileName:
41  print("Because you requested a FJR we assume you want the final " \
42  "hadd. No name specified for the output file, will use tree.root")
43  self.haddFileName = "tree.root"
44  self.branchsel = BranchSelection(branchsel) if branchsel else None
45  if outputbranchsel is not None:
46  self.outputbranchsel = BranchSelection(outputbranchsel)
47  elif outputbranchsel is None and branchsel is not None:
48  # Use the same branches in the output as in input
49  self.outputbranchsel = BranchSelection(branchsel)
50  else:
51  self.outputbranchsel = None
52 
53  self.histFileName = histFileName
54  self.histDirName = histDirName
55  # 2^63 - 1, largest int64
56  self.maxEntries = maxEntries if maxEntries else 9223372036854775807
57  self.firstEntry = firstEntry
58  self.prefetch = prefetch # prefetch files to TMPDIR using xrdcp
59  # keep cached files across runs (it's then up to you to clean up the temp)
60  self.longTermCache = longTermCache
61 
62  def prefetchFile(self, fname, verbose=True):
63  tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp"
64  if not fname.startswith("root://"):
65  return fname, False
66  rndchars = "".join([hex(i)[2:] for i in bytearray(os.urandom(8))]) \
67  if not self.longTermCache else "long_cache-id%d-%s" \
68  % (os.getuid(), hashlib.sha1(fname.encode('utf-8')).hexdigest())
69  localfile = "%s/%s-%s.root" \
70  % (tmpdir, os.path.basename(fname).replace(".root", ""), rndchars)
71  if self.longTermCache and os.path.exists(localfile):
72  if verbose:
73  print("Filename %s is already available in local path %s " \
74  % (fname, localfile))
75  return localfile, False
76  try:
77  if verbose:
78  print("Filename %s is remote, will do a copy to local path %s"\
79  % (fname, localfile))
80  start = time.time()
81  subprocess.check_output(["xrdcp", "-f", "-N", fname, localfile])
82  if verbose:
83  print("Time used for transferring the file locally: %.2f s"\
84  % (time.time() - start))
85  return localfile, (not self.longTermCache)
86  except:
87  if verbose:
88  print("Error: could not save file locally, will run from remote")
89  if os.path.exists(localfile):
90  if verbose:
91  print("Deleting partially transferred file %s" % localfile)
92  try:
93  os.unlink(localfile)
94  except:
95  pass
96  return fname, False
97 
98  def run(self):
99  outpostfix = self.postfix if self.postfix is not None else (
100  "_Friend" if self.friend else "_Skim")
101  if not self.noOut:
102 
103  if self.compression != "none":
104  ROOT.gInterpreter.ProcessLine("#include <Compression.h>")
105  (algo, level) = self.compression.split(":")
106  compressionLevel = int(level)
107  if algo == "LZMA":
108  compressionAlgo = ROOT.ROOT.kLZMA
109  elif algo == "ZLIB":
110  compressionAlgo = ROOT.ROOT.kZLIB
111  elif algo == "LZ4":
112  compressionAlgo = ROOT.ROOT.kLZ4
113  else:
114  raise RuntimeError("Unsupported compression %s" % algo)
115  else:
116  compressionLevel = 0
117  print("Will write selected trees to " + self.outputDir)
118  if not self.justcount:
119  if not os.path.exists(self.outputDir):
120  os.system("mkdir -p " + self.outputDir)
121  else:
122  compressionLevel = 0
123 
124  if self.noOut:
125  if len(self.modules) == 0:
126  raise RuntimeError(
127  "Running with --noout and no modules does nothing!")
128 
129  # Open histogram file, if desired
130  if (self.histFileName is not None and self.histDirName is None) or (self.histFileName is None and self.histDirName is not None):
131  raise RuntimeError(
132  "Must specify both histogram file and histogram directory!")
133  elif self.histFileName is not None and self.histDirName is not None:
134  self.histFile = ROOT.TFile.Open(self.histFileName, "RECREATE")
135  else:
136  self.histFile = None
137 
138  for m in self.modules:
139  if hasattr(m, 'writeHistFile') and m.writeHistFile:
140  m.beginJob(histFile=self.histFile,
141  histDirName=self.histDirName)
142  else:
143  m.beginJob()
144 
145  fullClone = (len(self.modules) == 0)
146  outFileNames = []
147  t0 = time.time()
148  totEntriesRead = 0
149  for fname in self.inputFiles:
150  ffnames = []
151  if "," in fname:
152  fnames = fname.split(',')
153  fname, ffnames = fnames[0], fnames[1:]
154 
155  fname = fname.strip()
156 
157  # Convert LFN to PFN if needed; this requires edmFileUtil to be present in $PATH
158  if fname.startswith('/store/') :
159  fname = subprocess.check_output(['edmFileUtil', '-d', '-f '+fname]).decode("utf-8").strip()
160 
161  # open input file
162  if self.prefetch:
163  ftoread, toBeDeleted = self.prefetchFile(fname)
164  inFile = ROOT.TFile.Open(ftoread)
165  else:
166  inFile = ROOT.TFile.Open(fname)
167 
168  # get input tree
169  inTree = inFile.Get("Events")
170  if inTree is None:
171  inTree = inFile.Get("Friends")
172  nEntries = min(inTree.GetEntries() -
173  self.firstEntry, self.maxEntries)
174  totEntriesRead += nEntries
175  # pre-skimming
176  elist, jsonFilter = preSkim(
177  inTree, self.json, self.cut, maxEntries=self.maxEntries, firstEntry=self.firstEntry)
178  if self.justcount:
179  print('Would select %d / %d entries from %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, fname, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
180  if self.prefetch:
181  if toBeDeleted:
182  os.unlink(ftoread)
183  continue
184  else:
185  print('Pre-select %d entries out of %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
186  inAddFiles = []
187  inAddTrees = []
188  for ffname in ffnames:
189  inAddFiles.append(ROOT.TFile.Open(ffname))
190  inAddTree = inAddFiles[-1].Get("Events")
191  if inAddTree is None:
192  inAddTree = inAddFiles[-1].Get("Friends")
193  inAddTrees.append(inAddTree)
194  inTree.AddFriend(inAddTree)
195 
196  if fullClone:
197  # no need of a reader (no event loop), but set up the elist if available
198  if elist:
199  inTree.SetEntryList(elist)
200  else:
201  # initialize reader
202  if elist:
203  inTree = InputTree(inTree, elist)
204  else:
205  inTree = InputTree(inTree)
206 
207  # prepare output file
208  if not self.noOut:
209  outFileName = os.path.join(self.outputDir, os.path.basename(
210  fname).replace(".root", outpostfix + ".root"))
211  outFile = ROOT.TFile.Open(
212  outFileName, "RECREATE", "", compressionLevel)
213  outFileNames.append(outFileName)
214  if compressionLevel:
215  outFile.SetCompressionAlgorithm(compressionAlgo)
216  # prepare output tree
217  if self.friend:
218  outTree = FriendOutput(inFile, inTree, outFile)
219  else:
220  firstEntry = 0 if fullClone and elist else self.firstEntry
221  outTree = FullOutput(
222  inFile,
223  inTree,
224  outFile,
225  branchSelection=self.branchsel,
226  outputbranchSelection=self.outputbranchsel,
227  fullClone=fullClone,
228  maxEntries=self.maxEntries,
229  firstEntry=firstEntry,
230  jsonFilter=jsonFilter,
231  provenance=self.provenance)
232  else:
233  outFile = None
234  outTree = None
235  if self.branchsel:
236  self.branchsel.selectBranches(inTree)
237 
238  # process events, if needed
239  if not fullClone and not (elist and elist.GetN() == 0):
240  eventRange = range(self.firstEntry, self.firstEntry +
241  nEntries) if nEntries > 0 and not elist else None
242  (nall, npass, timeLoop) = eventLoop(
243  self.modules, inFile, outFile, inTree, outTree,
244  eventRange=eventRange, maxEvents=self.maxEntries
245  )
246  print('Processed %d preselected entries from %s (%s entries). Finally selected %d entries' % (nall, fname, nEntries, npass))
247  elif outTree is not None:
248  nall = nEntries
249  print('Selected %d / %d entries from %s (%.2f%%)' % (outTree.tree().GetEntries(), nall, fname, outTree.tree().GetEntries() / (0.01 * nall) if nall else 0))
250 
251  # now write the output
252  if not self.noOut:
253  outTree.write()
254  outFile.Close()
255  print("Done %s" % outFileName)
256  if self.jobReport:
257  self.jobReport.addInputFile(fname, nall)
258  if self.prefetch:
259  if toBeDeleted:
260  os.unlink(ftoread)
261 
262  for m in self.modules:
263  m.endJob()
264 
265  print("Total time %.1f sec. to process %i events. Rate = %.1f Hz." % ((time.time() - t0), totEntriesRead, totEntriesRead / (time.time() - t0)))
266 
267  if self.haddFileName:
268  haddnano = "./haddnano.py" if os.path.isfile(
269  "./haddnano.py") else "haddnano.py"
270  os.system("%s %s %s" %
271  (haddnano, self.haddFileName, " ".join(outFileNames)))
272  if self.jobReport:
273  self.jobReport.addOutputFile(self.haddFileName)
274  self.jobReport.save()
def prefetchFile(self, fname, verbose=True)
def replace(string, replacements)
def eventLoop(filename)
Definition: example.py:20
def __init__(self, outputDir, inputFiles, cut=None, branchsel=None, modules=[], compression="LZMA:9", friend=False, postfix=None, jsonInput=None, noOut=False, justcount=False, provenance=False, haddFileName=None, fwkJobReport=False, histFileName=None, histDirName=None, outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False, longTermCache=False)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
bool decode(bool &, std::string const &)
Definition: types.cc:71
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def preSkim(tree, jsonInput=None, cutstring=None, maxEntries=None, firstEntry=0)
Definition: preskimming.py:59
save
Definition: cuy.py:1164
def InputTree(tree, entrylist=ROOT.MakeNullPointer(ROOT.TEntryList))
T * Get(Args... args)
Definition: Trend.h:122