CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
postprocessor.PostProcessor Class Reference

Public Member Functions

def __init__ (self, outputDir, inputFiles, cut=None, branchsel=None, modules=[], compression="LZMA:9", friend=False, postfix=None, jsonInput=None, noOut=False, justcount=False, provenance=False, haddFileName=None, fwkJobReport=False, histFileName=None, histDirName=None, outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False, longTermCache=False)
 
def prefetchFile (self, fname, verbose=True)
 
def run (self)
 

Public Attributes

 branchsel
 
 compression
 
 cut
 
 firstEntry
 
 friend
 
 haddFileName
 
 histDirName
 
 histFile
 
 histFileName
 
 inputFiles
 
 jobReport
 
 json
 
 justcount
 
 longTermCache
 
 maxEntries
 
 modules
 
 noOut
 
 outputbranchsel
 
 outputDir
 
 postfix
 
 prefetch
 
 provenance
 

Detailed Description

Definition at line 16 of file postprocessor.py.

Constructor & Destructor Documentation

◆ __init__()

def postprocessor.PostProcessor.__init__ (   self,
  outputDir,
  inputFiles,
  cut = None,
  branchsel = None,
  modules = [],
  compression = "LZMA:9",
  friend = False,
  postfix = None,
  jsonInput = None,
  noOut = False,
  justcount = False,
  provenance = False,
  haddFileName = None,
  fwkJobReport = False,
  histFileName = None,
  histDirName = None,
  outputbranchsel = None,
  maxEntries = None,
  firstEntry = 0,
  prefetch = False,
  longTermCache = False 
)

Definition at line 24 of file postprocessor.py.

24  ):
25  self.outputDir = outputDir
26  self.inputFiles = inputFiles
27  self.cut = cut
28  self.modules = modules
29  self.compression = compression
30  self.postfix = postfix
31  self.json = jsonInput
32  self.noOut = noOut
33  self.friend = friend
34  self.justcount = justcount
35  self.provenance = provenance
36  self.jobReport = JobReport() if fwkJobReport else None
37  self.haddFileName = haddFileName
38  self.histFile = None
39  self.histDirName = None
40  if self.jobReport and not self.haddFileName:
41  print("Because you requested a FJR we assume you want the final " \
42  "hadd. No name specified for the output file, will use tree.root")
43  self.haddFileName = "tree.root"
44  self.branchsel = BranchSelection(branchsel) if branchsel else None
45  if outputbranchsel is not None:
46  self.outputbranchsel = BranchSelection(outputbranchsel)
47  elif outputbranchsel is None and branchsel is not None:
48  # Use the same branches in the output as in input
49  self.outputbranchsel = BranchSelection(branchsel)
50  else:
51  self.outputbranchsel = None
52 
53  self.histFileName = histFileName
54  self.histDirName = histDirName
55  # 2^63 - 1, largest int64
56  self.maxEntries = maxEntries if maxEntries else 9223372036854775807
57  self.firstEntry = firstEntry
58  self.prefetch = prefetch # prefetch files to TMPDIR using xrdcp
59  # keep cached files across runs (it's then up to you to clean up the temp)
60  self.longTermCache = longTermCache
61 
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47

Member Function Documentation

◆ prefetchFile()

def postprocessor.PostProcessor.prefetchFile (   self,
  fname,
  verbose = True 
)

Definition at line 62 of file postprocessor.py.

References join(), postprocessor.PostProcessor.longTermCache, print(), and python.rootplot.root2matplotlib.replace().

Referenced by postprocessor.PostProcessor.run().

62  def prefetchFile(self, fname, verbose=True):
63  tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp"
64  if not fname.startswith("root://"):
65  return fname, False
66  rndchars = "".join([hex(i)[2:] for i in bytearray(os.urandom(8))]) \
67  if not self.longTermCache else "long_cache-id%d-%s" \
68  % (os.getuid(), hashlib.sha1(fname.encode('utf-8')).hexdigest())
69  localfile = "%s/%s-%s.root" \
70  % (tmpdir, os.path.basename(fname).replace(".root", ""), rndchars)
71  if self.longTermCache and os.path.exists(localfile):
72  if verbose:
73  print("Filename %s is already available in local path %s " \
74  % (fname, localfile))
75  return localfile, False
76  try:
77  if verbose:
78  print("Filename %s is remote, will do a copy to local path %s"\
79  % (fname, localfile))
80  start = time.time()
81  subprocess.check_output(["xrdcp", "-f", "-N", fname, localfile])
82  if verbose:
83  print("Time used for transferring the file locally: %.2f s"\
84  % (time.time() - start))
85  return localfile, (not self.longTermCache)
86  except:
87  if verbose:
88  print("Error: could not save file locally, will run from remote")
89  if os.path.exists(localfile):
90  if verbose:
91  print("Deleting partially transferred file %s" % localfile)
92  try:
93  os.unlink(localfile)
94  except:
95  pass
96  return fname, False
97 
def replace(string, replacements)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19

◆ run()

def postprocessor.PostProcessor.run (   self)

Definition at line 98 of file postprocessor.py.

References postprocessor.PostProcessor.branchsel, BCoptions.compression, postprocessor.PostProcessor.compression, ZDCTask.cut, QIE10Task.cut, QIE11Task.cut, postprocessor.PostProcessor.cut, reco::parser::Grammar::definition< ScannerT >.cut, TriggerObjectTableProducer::SelectedObject.cut, edm.decode(), example.eventLoop(), postprocessor.PostProcessor.firstEntry, output.FullOutput.firstEntry, StripValidationPlots.firstEntry, postprocessor.PostProcessor.friend, Get(), postprocessor.PostProcessor.haddFileName, postprocessor.PostProcessor.histDirName, eventloop.Module.histFile, postprocessor.PostProcessor.histFile, postprocessor.PostProcessor.histFileName, postprocessor.PostProcessor.inputFiles, sistrip::EnsembleCalibrationLA.inputFiles, sistrip::MeasureLA.inputFiles, VarParsing.VarParsing.inputFiles, treeReaderArrayTools.InputTree(), createfilelist.int, WorkFlowRunner.WorkFlowRunner.jobReport, postprocessor.PostProcessor.jobReport, join(), postprocessor.PostProcessor.json, config.DataComponent.json, SummaryOutputProducer.json, SummaryOutputProducer::GenericSummary.json, postprocessor.PostProcessor.justcount, postprocessor.PostProcessor.maxEntries, output.FullOutput.maxEntries, SiStripPI.min, postprocessor.PostProcessor.modules, HCalEndcapAlgo.modules, TritonService::Model.modules, SequenceVisitors.NodeVisitor.modules, EcalABAnalyzer.modules, ClusterSummary.modules, EcalTestPulseAnalyzer.modules, EcalLaserAnalyzer2.modules, EcalLaserAnalyzer.modules, DDHCalEndcapAlgo.modules, FastTimerService::GroupOfModules.modules, FastTimerService::ResourcesPerJob.modules, postprocessor.PostProcessor.noOut, postprocessor.PostProcessor.outputbranchsel, DMRplotter.DMRplotter.outputDir, postprocessor.PostProcessor.outputDir, Trend.outputDir, PlotAlignmentValidation.outputDir, postprocessor.PostProcessor.postfix, cmsswPreprocessor.CmsswPreprocessor.prefetch, postprocessor.PostProcessor.prefetch, edmTracerCompactLogViewer.Activity.prefetch, postprocessor.PostProcessor.prefetchFile(), preskimming.preSkim(), print(), core.ProvenanceAnalyzer.ProvenanceAnalyzer.provenance, cond::UserLogInfo.provenance, postprocessor.PostProcessor.provenance, cond::LogDBEntry_t.provenance, FastTimerService_cff.range, python.rootplot.root2matplotlib.replace(), cuy.save, submitPVValidationJobs.split(), and nano_mu_digi_cff.strip.

98  def run(self):
99  outpostfix = self.postfix if self.postfix is not None else (
100  "_Friend" if self.friend else "_Skim")
101  if not self.noOut:
102 
103  if self.compression != "none":
104  ROOT.gInterpreter.ProcessLine("#include <Compression.h>")
105  (algo, level) = self.compression.split(":")
106  compressionLevel = int(level)
107  if algo == "LZMA":
108  compressionAlgo = ROOT.ROOT.kLZMA
109  elif algo == "ZLIB":
110  compressionAlgo = ROOT.ROOT.kZLIB
111  elif algo == "LZ4":
112  compressionAlgo = ROOT.ROOT.kLZ4
113  else:
114  raise RuntimeError("Unsupported compression %s" % algo)
115  else:
116  compressionLevel = 0
117  print("Will write selected trees to " + self.outputDir)
118  if not self.justcount:
119  if not os.path.exists(self.outputDir):
120  os.system("mkdir -p " + self.outputDir)
121  else:
122  compressionLevel = 0
123 
124  if self.noOut:
125  if len(self.modules) == 0:
126  raise RuntimeError(
127  "Running with --noout and no modules does nothing!")
128 
129  # Open histogram file, if desired
130  if (self.histFileName is not None and self.histDirName is None) or (self.histFileName is None and self.histDirName is not None):
131  raise RuntimeError(
132  "Must specify both histogram file and histogram directory!")
133  elif self.histFileName is not None and self.histDirName is not None:
134  self.histFile = ROOT.TFile.Open(self.histFileName, "RECREATE")
135  else:
136  self.histFile = None
137 
138  for m in self.modules:
139  if hasattr(m, 'writeHistFile') and m.writeHistFile:
140  m.beginJob(histFile=self.histFile,
141  histDirName=self.histDirName)
142  else:
143  m.beginJob()
144 
145  fullClone = (len(self.modules) == 0)
146  outFileNames = []
147  t0 = time.time()
148  totEntriesRead = 0
149  for fname in self.inputFiles:
150  ffnames = []
151  if "," in fname:
152  fnames = fname.split(',')
153  fname, ffnames = fnames[0], fnames[1:]
154 
155  fname = fname.strip()
156 
157  # Convert LFN to PFN if needed; this requires edmFileUtil to be present in $PATH
158  if fname.startswith('/store/') :
159  fname = subprocess.check_output(['edmFileUtil', '-d', '-f '+fname]).decode("utf-8").strip()
160 
161  # open input file
162  if self.prefetch:
163  ftoread, toBeDeleted = self.prefetchFile(fname)
164  inFile = ROOT.TFile.Open(ftoread)
165  else:
166  inFile = ROOT.TFile.Open(fname)
167 
168  # get input tree
169  inTree = inFile.Get("Events")
170  if inTree is None:
171  inTree = inFile.Get("Friends")
172  nEntries = min(inTree.GetEntries() -
173  self.firstEntry, self.maxEntries)
174  totEntriesRead += nEntries
175  # pre-skimming
176  elist, jsonFilter = preSkim(
177  inTree, self.json, self.cut, maxEntries=self.maxEntries, firstEntry=self.firstEntry)
178  if self.justcount:
179  print('Would select %d / %d entries from %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, fname, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
180  if self.prefetch:
181  if toBeDeleted:
182  os.unlink(ftoread)
183  continue
184  else:
185  print('Pre-select %d entries out of %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
186  inAddFiles = []
187  inAddTrees = []
188  for ffname in ffnames:
189  inAddFiles.append(ROOT.TFile.Open(ffname))
190  inAddTree = inAddFiles[-1].Get("Events")
191  if inAddTree is None:
192  inAddTree = inAddFiles[-1].Get("Friends")
193  inAddTrees.append(inAddTree)
194  inTree.AddFriend(inAddTree)
195 
196  if fullClone:
197  # no need of a reader (no event loop), but set up the elist if available
198  if elist:
199  inTree.SetEntryList(elist)
200  else:
201  # initialize reader
202  if elist:
203  inTree = InputTree(inTree, elist)
204  else:
205  inTree = InputTree(inTree)
206 
207  # prepare output file
208  if not self.noOut:
209  outFileName = os.path.join(self.outputDir, os.path.basename(
210  fname).replace(".root", outpostfix + ".root"))
211  outFile = ROOT.TFile.Open(
212  outFileName, "RECREATE", "", compressionLevel)
213  outFileNames.append(outFileName)
214  if compressionLevel:
215  outFile.SetCompressionAlgorithm(compressionAlgo)
216  # prepare output tree
217  if self.friend:
218  outTree = FriendOutput(inFile, inTree, outFile)
219  else:
220  firstEntry = 0 if fullClone and elist else self.firstEntry
221  outTree = FullOutput(
222  inFile,
223  inTree,
224  outFile,
225  branchSelection=self.branchsel,
226  outputbranchSelection=self.outputbranchsel,
227  fullClone=fullClone,
228  maxEntries=self.maxEntries,
229  firstEntry=firstEntry,
230  jsonFilter=jsonFilter,
231  provenance=self.provenance)
232  else:
233  outFile = None
234  outTree = None
235  if self.branchsel:
236  self.branchsel.selectBranches(inTree)
237 
238  # process events, if needed
239  if not fullClone and not (elist and elist.GetN() == 0):
240  eventRange = range(self.firstEntry, self.firstEntry +
241  nEntries) if nEntries > 0 and not elist else None
242  (nall, npass, timeLoop) = eventLoop(
243  self.modules, inFile, outFile, inTree, outTree,
244  eventRange=eventRange, maxEvents=self.maxEntries
245  )
246  print('Processed %d preselected entries from %s (%s entries). Finally selected %d entries' % (nall, fname, nEntries, npass))
247  elif outTree is not None:
248  nall = nEntries
249  print('Selected %d / %d entries from %s (%.2f%%)' % (outTree.tree().GetEntries(), nall, fname, outTree.tree().GetEntries() / (0.01 * nall) if nall else 0))
250 
251  # now write the output
252  if not self.noOut:
253  outTree.write()
254  outFile.Close()
255  print("Done %s" % outFileName)
256  if self.jobReport:
257  self.jobReport.addInputFile(fname, nall)
258  if self.prefetch:
259  if toBeDeleted:
260  os.unlink(ftoread)
261 
262  for m in self.modules:
263  m.endJob()
264 
265  print("Total time %.1f sec. to process %i events. Rate = %.1f Hz." % ((time.time() - t0), totEntriesRead, totEntriesRead / (time.time() - t0)))
266 
267  if self.haddFileName:
268  haddnano = "./haddnano.py" if os.path.isfile(
269  "./haddnano.py") else "haddnano.py"
270  os.system("%s %s %s" %
271  (haddnano, self.haddFileName, " ".join(outFileNames)))
272  if self.jobReport:
273  self.jobReport.addOutputFile(self.haddFileName)
274  self.jobReport.save()
275 
def replace(string, replacements)
def eventLoop(filename)
Definition: example.py:20
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def preSkim(tree, jsonInput=None, cutstring=None, maxEntries=None, firstEntry=0)
Definition: preskimming.py:59
bool decode(bool &, std::string_view)
Definition: types.cc:72
save
Definition: cuy.py:1164
def InputTree(tree, entrylist=ROOT.MakeNullPointer(ROOT.TEntryList))
T * Get(Args... args)
Definition: Trend.h:122

Member Data Documentation

◆ branchsel

postprocessor.PostProcessor.branchsel

Definition at line 44 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ compression

postprocessor.PostProcessor.compression

Definition at line 29 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ cut

postprocessor.PostProcessor.cut

Definition at line 27 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ firstEntry

postprocessor.PostProcessor.firstEntry

Definition at line 57 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ friend

postprocessor.PostProcessor.friend

Definition at line 33 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ haddFileName

postprocessor.PostProcessor.haddFileName

Definition at line 37 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ histDirName

postprocessor.PostProcessor.histDirName

Definition at line 39 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ histFile

postprocessor.PostProcessor.histFile

Definition at line 38 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ histFileName

postprocessor.PostProcessor.histFileName

Definition at line 53 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ inputFiles

postprocessor.PostProcessor.inputFiles

Definition at line 26 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ jobReport

postprocessor.PostProcessor.jobReport

Definition at line 36 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ json

postprocessor.PostProcessor.json

Definition at line 31 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ justcount

postprocessor.PostProcessor.justcount

Definition at line 34 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ longTermCache

postprocessor.PostProcessor.longTermCache

Definition at line 60 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.prefetchFile().

◆ maxEntries

postprocessor.PostProcessor.maxEntries

Definition at line 56 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ modules

postprocessor.PostProcessor.modules

Definition at line 28 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ noOut

postprocessor.PostProcessor.noOut

Definition at line 32 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ outputbranchsel

postprocessor.PostProcessor.outputbranchsel

Definition at line 46 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ outputDir

postprocessor.PostProcessor.outputDir

Definition at line 25 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ postfix

postprocessor.PostProcessor.postfix

◆ prefetch

postprocessor.PostProcessor.prefetch

Definition at line 58 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ provenance

postprocessor.PostProcessor.provenance

Definition at line 35 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().