CMS 3D CMS Logo

postprocessor.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from PhysicsTools.NanoAODTools.postprocessing.framework.jobreport import JobReport
3 from PhysicsTools.NanoAODTools.postprocessing.framework.preskimming import preSkim
4 from PhysicsTools.NanoAODTools.postprocessing.framework.output import FriendOutput, FullOutput
6 from PhysicsTools.NanoAODTools.postprocessing.framework.datamodel import InputTree
7 from PhysicsTools.NanoAODTools.postprocessing.framework.branchselection import BranchSelection
8 import os
9 import time
10 import hashlib
11 import subprocess
12 import ROOT
13 ROOT.PyConfig.IgnoreCommandLineOptions = True
14 
15 
17  def __init__(
18  self, outputDir, inputFiles, cut=None, branchsel=None, modules=[],
19  compression="LZMA:9", friend=False, postfix=None, jsonInput=None,
20  noOut=False, justcount=False, provenance=False, haddFileName=None,
21  fwkJobReport=False, histFileName=None, histDirName=None,
22  outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False,
23  longTermCache=False
24  ):
25  self.outputDir = outputDir
26  self.inputFiles = inputFiles
27  self.cut = cut
28  self.modules = modules
29  self.compression = compression
30  self.postfix = postfix
31  self.json = jsonInput
32  self.noOut = noOut
33  self.friend = friend
34  self.justcount = justcount
35  self.provenance = provenance
36  self.jobReport = JobReport() if fwkJobReport else None
37  self.haddFileName = haddFileName
38  self.histFile = None
39  self.histDirName = None
40  if self.jobReport and not self.haddFileName:
41  print("Because you requested a FJR we assume you want the final " \
42  "hadd. No name specified for the output file, will use tree.root")
43  self.haddFileName = "tree.root"
44  self.branchsel = BranchSelection(branchsel) if branchsel else None
45  if outputbranchsel is not None:
46  self.outputbranchsel = BranchSelection(outputbranchsel)
47  elif outputbranchsel is None and branchsel is not None:
48  # Use the same branches in the output as in input
49  self.outputbranchsel = BranchSelection(branchsel)
50  else:
51  self.outputbranchsel = None
52 
53  self.histFileName = histFileName
54  self.histDirName = histDirName
55  # 2^63 - 1, largest int64
56  self.maxEntries = maxEntries if maxEntries else 9223372036854775807
57  self.firstEntry = firstEntry
58  self.prefetch = prefetch # prefetch files to TMPDIR using xrdcp
59  # keep cached files across runs (it's then up to you to clean up the temp)
60  self.longTermCache = longTermCache
61 
62  def prefetchFile(self, fname, verbose=True):
63  tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp"
64  if not fname.startswith("root://"):
65  return fname, False
66  rndchars = "".join([hex(i)[2:] for i in bytearray(os.urandom(8))]) \
67  if not self.longTermCache else "long_cache-id%d-%s" \
68  % (os.getuid(), hashlib.sha1(fname.encode('utf-8')).hexdigest())
69  localfile = "%s/%s-%s.root" \
70  % (tmpdir, os.path.basename(fname).replace(".root", ""), rndchars)
71  if self.longTermCache and os.path.exists(localfile):
72  if verbose:
73  print("Filename %s is already available in local path %s " \
74  % (fname, localfile))
75  return localfile, False
76  try:
77  if verbose:
78  print("Filename %s is remote, will do a copy to local path %s"\
79  % (fname, localfile))
80  start = time.time()
81  subprocess.check_output(["xrdcp", "-f", "-N", fname, localfile])
82  if verbose:
83  print("Time used for transferring the file locally: %.2f s"\
84  % (time.time() - start))
85  return localfile, (not self.longTermCache)
86  except:
87  if verbose:
88  print("Error: could not save file locally, will run from remote")
89  if os.path.exists(localfile):
90  if verbose:
91  print("Deleting partially transferred file %s" % localfile)
92  try:
93  os.unlink(localfile)
94  except:
95  pass
96  return fname, False
97 
98  def run(self):
99  outpostfix = self.postfix if self.postfix is not None else (
100  "_Friend" if self.friend else "_Skim")
101  if not self.noOut:
102 
103  if self.compression != "none":
104  ROOT.gInterpreter.ProcessLine("#include <Compression.h>")
105  (algo, level) = self.compression.split(":")
106  compressionLevel = int(level)
107  if algo == "LZMA":
108  compressionAlgo = ROOT.ROOT.kLZMA
109  elif algo == "ZLIB":
110  compressionAlgo = ROOT.ROOT.kZLIB
111  elif algo == "LZ4":
112  compressionAlgo = ROOT.ROOT.kLZ4
113  else:
114  raise RuntimeError("Unsupported compression %s" % algo)
115  else:
116  compressionLevel = 0
117  print("Will write selected trees to " + self.outputDir)
118  if not self.justcount:
119  if not os.path.exists(self.outputDir):
120  os.system("mkdir -p " + self.outputDir)
121  else:
122  compressionLevel = 0
123 
124  if self.noOut:
125  if len(self.modules) == 0:
126  raise RuntimeError(
127  "Running with --noout and no modules does nothing!")
128 
129  # Open histogram file, if desired
130  if (self.histFileName is not None and self.histDirName is None) or (self.histFileName is None and self.histDirName is not None):
131  raise RuntimeError(
132  "Must specify both histogram file and histogram directory!")
133  elif self.histFileName is not None and self.histDirName is not None:
134  self.histFile = ROOT.TFile.Open(self.histFileName, "RECREATE")
135  else:
136  self.histFile = None
137 
138  for m in self.modules:
139  if hasattr(m, 'writeHistFile') and m.writeHistFile:
140  m.beginJob(histFile=self.histFile,
141  histDirName=self.histDirName)
142  else:
143  m.beginJob()
144 
145  fullClone = (len(self.modules) == 0)
146  outFileNames = []
147  t0 = time.time()
148  totEntriesRead = 0
149  for fname in self.inputFiles:
150  ffnames = []
151  if "," in fname:
152  fnames = fname.split(',')
153  fname, ffnames = fnames[0], fnames[1:]
154 
155  fname = fname.strip()
156 
157  # Convert LFN to PFN if needed; this requires edmFileUtil to be present in $PATH
158  if fname.startswith('/store/') :
159  fname = subprocess.check_output(['edmFileUtil', '-d', '-f '+fname]).decode("utf-8").strip()
160 
161  # open input file
162  print(time.strftime("%d-%b-%Y %H:%M:%S %Z", time.localtime()), " Initiating request to open file %s" %(fname), flush=True) # CMSSW-syle message, required by eos caching scripts
163  if self.prefetch:
164  ftoread, toBeDeleted = self.prefetchFile(fname)
165  inFile = ROOT.TFile.Open(ftoread)
166  else:
167  inFile = ROOT.TFile.Open(fname)
168 
169  # get input tree
170  inTree = inFile.Get("Events")
171  if inTree is None:
172  inTree = inFile.Get("Friends")
173  nEntries = min(inTree.GetEntries() -
174  self.firstEntry, self.maxEntries)
175  totEntriesRead += nEntries
176  # pre-skimming
177  elist, jsonFilter = preSkim(
178  inTree, self.json, self.cut, maxEntries=self.maxEntries, firstEntry=self.firstEntry)
179  if self.justcount:
180  print('Would select %d / %d entries from %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, fname, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
181  if self.prefetch:
182  if toBeDeleted:
183  os.unlink(ftoread)
184  continue
185  else:
186  print('Pre-select %d entries out of %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
187  inAddFiles = []
188  inAddTrees = []
189  for ffname in ffnames:
190  inAddFiles.append(ROOT.TFile.Open(ffname))
191  inAddTree = inAddFiles[-1].Get("Events")
192  if inAddTree is None:
193  inAddTree = inAddFiles[-1].Get("Friends")
194  inAddTrees.append(inAddTree)
195  inTree.AddFriend(inAddTree)
196 
197  if fullClone:
198  # no need of a reader (no event loop), but set up the elist if available
199  if elist:
200  inTree.SetEntryList(elist)
201  else:
202  # initialize reader
203  if elist:
204  inTree = InputTree(inTree, elist)
205  else:
206  inTree = InputTree(inTree)
207 
208  # prepare output file
209  if not self.noOut:
210  outFileName = os.path.join(self.outputDir, os.path.basename(
211  fname).replace(".root", outpostfix + ".root"))
212  outFile = ROOT.TFile.Open(
213  outFileName, "RECREATE", "", compressionLevel)
214  outFileNames.append(outFileName)
215  if compressionLevel:
216  outFile.SetCompressionAlgorithm(compressionAlgo)
217  # prepare output tree
218  if self.friend:
219  outTree = FriendOutput(inFile, inTree, outFile)
220  else:
221  firstEntry = 0 if fullClone and elist else self.firstEntry
222  outTree = FullOutput(
223  inFile,
224  inTree,
225  outFile,
226  branchSelection=self.branchsel,
227  outputbranchSelection=self.outputbranchsel,
228  fullClone=fullClone,
229  maxEntries=self.maxEntries,
230  firstEntry=firstEntry,
231  jsonFilter=jsonFilter,
232  provenance=self.provenance)
233  else:
234  outFile = None
235  outTree = None
236  if self.branchsel:
237  self.branchsel.selectBranches(inTree)
238 
239  # process events, if needed
240  if not fullClone and not (elist and elist.GetN() == 0):
241  eventRange = range(self.firstEntry, self.firstEntry +
242  nEntries) if nEntries > 0 and not elist else None
243  (nall, npass, timeLoop) = eventLoop(
244  self.modules, inFile, outFile, inTree, outTree,
245  eventRange=eventRange, maxEvents=self.maxEntries
246  )
247  print('Processed %d preselected entries from %s (%s entries). Finally selected %d entries' % (nall, fname, nEntries, npass))
248  elif outTree is not None:
249  nall = nEntries
250  print('Selected %d / %d entries from %s (%.2f%%)' % (outTree.tree().GetEntries(), nall, fname, outTree.tree().GetEntries() / (0.01 * nall) if nall else 0))
251 
252  # now write the output
253  if not self.noOut:
254  outTree.write()
255  outFile.Close()
256  print("Done %s" % outFileName)
257  if self.jobReport:
258  self.jobReport.addInputFile(fname, nall)
259  if self.prefetch:
260  if toBeDeleted:
261  os.unlink(ftoread)
262 
263  for m in self.modules:
264  m.endJob()
265 
266  # close histogram file
267  if self.histFile != None:
268  self.histFile.Close()
269 
270  print("Total time %.1f sec. to process %i events. Rate = %.1f Hz." % ((time.time() - t0), totEntriesRead, totEntriesRead / (time.time() - t0)))
271 
272  if self.haddFileName:
273  haddnano = "./haddnano.py" if os.path.isfile(
274  "./haddnano.py") else "haddnano.py"
275  os.system("%s %s %s" %
276  (haddnano, self.haddFileName, " ".join(outFileNames)))
277  if self.jobReport:
278  self.jobReport.addOutputFile(self.haddFileName)
279  self.jobReport.save()
def prefetchFile(self, fname, verbose=True)
def replace(string, replacements)
def eventLoop(filename)
Definition: example.py:20
def __init__(self, outputDir, inputFiles, cut=None, branchsel=None, modules=[], compression="LZMA:9", friend=False, postfix=None, jsonInput=None, noOut=False, justcount=False, provenance=False, haddFileName=None, fwkJobReport=False, histFileName=None, histDirName=None, outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False, longTermCache=False)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def preSkim(tree, jsonInput=None, cutstring=None, maxEntries=None, firstEntry=0)
Definition: preskimming.py:59
bool decode(bool &, std::string_view)
Definition: types.cc:72
save
Definition: cuy.py:1164
def InputTree(tree, entrylist=ROOT.MakeNullPointer(ROOT.TEntryList))
T * Get(Args... args)
Definition: Trend.h:122