CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
postprocessor.PostProcessor Class Reference

Public Member Functions

def __init__ (self, outputDir, inputFiles, cut=None, branchsel=None, modules=[], compression="LZMA:9", friend=False, postfix=None, jsonInput=None, noOut=False, justcount=False, provenance=False, haddFileName=None, fwkJobReport=False, histFileName=None, histDirName=None, outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False, longTermCache=False)
 
def prefetchFile (self, fname, verbose=True)
 
def run (self)
 

Public Attributes

 branchsel
 
 compression
 
 cut
 
 firstEntry
 
 friend
 
 haddFileName
 
 histDirName
 
 histFile
 
 histFileName
 
 inputFiles
 
 jobReport
 
 json
 
 justcount
 
 longTermCache
 
 maxEntries
 
 modules
 
 noOut
 
 outputbranchsel
 
 outputDir
 
 postfix
 
 prefetch
 
 provenance
 

Detailed Description

Definition at line 16 of file postprocessor.py.

Constructor & Destructor Documentation

◆ __init__()

def postprocessor.PostProcessor.__init__ (   self,
  outputDir,
  inputFiles,
  cut = None,
  branchsel = None,
  modules = [],
  compression = "LZMA:9",
  friend = False,
  postfix = None,
  jsonInput = None,
  noOut = False,
  justcount = False,
  provenance = False,
  haddFileName = None,
  fwkJobReport = False,
  histFileName = None,
  histDirName = None,
  outputbranchsel = None,
  maxEntries = None,
  firstEntry = 0,
  prefetch = False,
  longTermCache = False 
)

Definition at line 24 of file postprocessor.py.

24  ):
25  self.outputDir = outputDir
26  self.inputFiles = inputFiles
27  self.cut = cut
28  self.modules = modules
29  self.compression = compression
30  self.postfix = postfix
31  self.json = jsonInput
32  self.noOut = noOut
33  self.friend = friend
34  self.justcount = justcount
35  self.provenance = provenance
36  self.jobReport = JobReport() if fwkJobReport else None
37  self.haddFileName = haddFileName
38  self.histFile = None
39  self.histDirName = None
40  if self.jobReport and not self.haddFileName:
41  print("Because you requested a FJR we assume you want the final " \
42  "hadd. No name specified for the output file, will use tree.root")
43  self.haddFileName = "tree.root"
44  self.branchsel = BranchSelection(branchsel) if branchsel else None
45  if outputbranchsel is not None:
46  self.outputbranchsel = BranchSelection(outputbranchsel)
47  elif outputbranchsel is None and branchsel is not None:
48  # Use the same branches in the output as in input
49  self.outputbranchsel = BranchSelection(branchsel)
50  else:
51  self.outputbranchsel = None
52 
53  self.histFileName = histFileName
54  self.histDirName = histDirName
55  # 2^63 - 1, largest int64
56  self.maxEntries = maxEntries if maxEntries else 9223372036854775807
57  self.firstEntry = firstEntry
58  self.prefetch = prefetch # prefetch files to TMPDIR using xrdcp
59  # keep cached files across runs (it's then up to you to clean up the temp)
60  self.longTermCache = longTermCache
61 
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47

Member Function Documentation

◆ prefetchFile()

def postprocessor.PostProcessor.prefetchFile (   self,
  fname,
  verbose = True 
)

Definition at line 62 of file postprocessor.py.

References join(), postprocessor.PostProcessor.longTermCache, print(), and python.rootplot.root2matplotlib.replace().

Referenced by postprocessor.PostProcessor.run().

62  def prefetchFile(self, fname, verbose=True):
63  tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp"
64  if not fname.startswith("root://"):
65  return fname, False
66  rndchars = "".join([hex(i)[2:] for i in bytearray(os.urandom(8))]) \
67  if not self.longTermCache else "long_cache-id%d-%s" \
68  % (os.getuid(), hashlib.sha1(fname.encode('utf-8')).hexdigest())
69  localfile = "%s/%s-%s.root" \
70  % (tmpdir, os.path.basename(fname).replace(".root", ""), rndchars)
71  if self.longTermCache and os.path.exists(localfile):
72  if verbose:
73  print("Filename %s is already available in local path %s " \
74  % (fname, localfile))
75  return localfile, False
76  try:
77  if verbose:
78  print("Filename %s is remote, will do a copy to local path %s"\
79  % (fname, localfile))
80  start = time.time()
81  subprocess.check_output(["xrdcp", "-f", "-N", fname, localfile])
82  if verbose:
83  print("Time used for transferring the file locally: %.2f s"\
84  % (time.time() - start))
85  return localfile, (not self.longTermCache)
86  except:
87  if verbose:
88  print("Error: could not save file locally, will run from remote")
89  if os.path.exists(localfile):
90  if verbose:
91  print("Deleting partially transferred file %s" % localfile)
92  try:
93  os.unlink(localfile)
94  except:
95  pass
96  return fname, False
97 
def replace(string, replacements)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21

◆ run()

def postprocessor.PostProcessor.run (   self)

Definition at line 98 of file postprocessor.py.

References postprocessor.PostProcessor.branchsel, BCoptions.compression, postprocessor.PostProcessor.compression, postprocessor.PostProcessor.cut, reco::parser::Grammar::definition< ScannerT >.cut, TriggerObjectTableProducer::SelectedObject.cut, edm.decode(), example.eventLoop(), postprocessor.PostProcessor.firstEntry, output.FullOutput.firstEntry, StripValidationPlots.firstEntry, postprocessor.PostProcessor.friend, Get(), postprocessor.PostProcessor.haddFileName, postprocessor.PostProcessor.histDirName, eventloop.Module.histFile, postprocessor.PostProcessor.histFile, postprocessor.PostProcessor.histFileName, postprocessor.PostProcessor.inputFiles, sistrip::EnsembleCalibrationLA.inputFiles, sistrip::MeasureLA.inputFiles, VarParsing.VarParsing.inputFiles, treeReaderArrayTools.InputTree(), createfilelist.int, WorkFlowRunner.WorkFlowRunner.jobReport, postprocessor.PostProcessor.jobReport, join(), postprocessor.PostProcessor.json, config.DataComponent.json, SummaryOutputProducer.json, SummaryOutputProducer::GenericSummary.json, postprocessor.PostProcessor.justcount, postprocessor.PostProcessor.maxEntries, output.FullOutput.maxEntries, SiStripPI.min, postprocessor.PostProcessor.modules, HCalEndcapAlgo.modules, TritonService::Model.modules, SequenceVisitors.NodeVisitor.modules, EcalABAnalyzer.modules, ClusterSummary.modules, EcalTestPulseAnalyzer.modules, EcalLaserAnalyzer2.modules, EcalLaserAnalyzer.modules, DDHCalEndcapAlgo.modules, FastTimerService::GroupOfModules.modules, FastTimerService::ResourcesPerJob.modules, postprocessor.PostProcessor.noOut, postprocessor.PostProcessor.outputbranchsel, DMRplotter.DMRplotter.outputDir, postprocessor.PostProcessor.outputDir, Trend.outputDir, PlotAlignmentValidation.outputDir, postprocessor.PostProcessor.postfix, cmsswPreprocessor.CmsswPreprocessor.prefetch, postprocessor.PostProcessor.prefetch, edmModuleAllocMonitorAnalyze.Activity.prefetch, edmTracerCompactLogViewer.Activity.prefetch, postprocessor.PostProcessor.prefetchFile(), preskimming.preSkim(), print(), core.ProvenanceAnalyzer.ProvenanceAnalyzer.provenance, cond::UserLogInfo.provenance, postprocessor.PostProcessor.provenance, cond::LogDBEntry_t.provenance, FastTimerService_cff.range, python.rootplot.root2matplotlib.replace(), cuy.save, submitPVValidationJobs.split(), and nano_mu_digi_cff.strip.

98  def run(self):
99  outpostfix = self.postfix if self.postfix is not None else (
100  "_Friend" if self.friend else "_Skim")
101  if not self.noOut:
102 
103  if self.compression != "none":
104  ROOT.gInterpreter.ProcessLine("#include <Compression.h>")
105  (algo, level) = self.compression.split(":")
106  compressionLevel = int(level)
107  if algo == "LZMA":
108  compressionAlgo = ROOT.ROOT.kLZMA
109  elif algo == "ZLIB":
110  compressionAlgo = ROOT.ROOT.kZLIB
111  elif algo == "LZ4":
112  compressionAlgo = ROOT.ROOT.kLZ4
113  else:
114  raise RuntimeError("Unsupported compression %s" % algo)
115  else:
116  compressionLevel = 0
117  print("Will write selected trees to " + self.outputDir)
118  if not self.justcount:
119  if not os.path.exists(self.outputDir):
120  os.system("mkdir -p " + self.outputDir)
121  else:
122  compressionLevel = 0
123 
124  if self.noOut:
125  if len(self.modules) == 0:
126  raise RuntimeError(
127  "Running with --noout and no modules does nothing!")
128 
129  # Open histogram file, if desired
130  if (self.histFileName is not None and self.histDirName is None) or (self.histFileName is None and self.histDirName is not None):
131  raise RuntimeError(
132  "Must specify both histogram file and histogram directory!")
133  elif self.histFileName is not None and self.histDirName is not None:
134  self.histFile = ROOT.TFile.Open(self.histFileName, "RECREATE")
135  else:
136  self.histFile = None
137 
138  for m in self.modules:
139  if hasattr(m, 'writeHistFile') and m.writeHistFile:
140  m.beginJob(histFile=self.histFile,
141  histDirName=self.histDirName)
142  else:
143  m.beginJob()
144 
145  fullClone = (len(self.modules) == 0)
146  outFileNames = []
147  t0 = time.time()
148  totEntriesRead = 0
149  for fname in self.inputFiles:
150  ffnames = []
151  if "," in fname:
152  fnames = fname.split(',')
153  fname, ffnames = fnames[0], fnames[1:]
154 
155  fname = fname.strip()
156 
157  # Convert LFN to PFN if needed; this requires edmFileUtil to be present in $PATH
158  if fname.startswith('/store/') :
159  fname = subprocess.check_output(['edmFileUtil', '-d', '-f '+fname]).decode("utf-8").strip()
160 
161  # open input file
162  print(time.strftime("%d-%b-%Y %H:%M:%S %Z", time.localtime()), " Initiating request to open file %s" %(fname), flush=True) # CMSSW-syle message, required by eos caching scripts
163  if self.prefetch:
164  ftoread, toBeDeleted = self.prefetchFile(fname)
165  inFile = ROOT.TFile.Open(ftoread)
166  else:
167  inFile = ROOT.TFile.Open(fname)
168 
169  # get input tree
170  inTree = inFile.Get("Events")
171  if inTree is None:
172  inTree = inFile.Get("Friends")
173  nEntries = min(inTree.GetEntries() -
174  self.firstEntry, self.maxEntries)
175  totEntriesRead += nEntries
176  # pre-skimming
177  elist, jsonFilter = preSkim(
178  inTree, self.json, self.cut, maxEntries=self.maxEntries, firstEntry=self.firstEntry)
179  if self.justcount:
180  print('Would select %d / %d entries from %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, fname, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
181  if self.prefetch:
182  if toBeDeleted:
183  os.unlink(ftoread)
184  continue
185  else:
186  print('Pre-select %d entries out of %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
187  inAddFiles = []
188  inAddTrees = []
189  for ffname in ffnames:
190  inAddFiles.append(ROOT.TFile.Open(ffname))
191  inAddTree = inAddFiles[-1].Get("Events")
192  if inAddTree is None:
193  inAddTree = inAddFiles[-1].Get("Friends")
194  inAddTrees.append(inAddTree)
195  inTree.AddFriend(inAddTree)
196 
197  if fullClone:
198  # no need of a reader (no event loop), but set up the elist if available
199  if elist:
200  inTree.SetEntryList(elist)
201  else:
202  # initialize reader
203  if elist:
204  inTree = InputTree(inTree, elist)
205  else:
206  inTree = InputTree(inTree)
207 
208  # prepare output file
209  if not self.noOut:
210  outFileName = os.path.join(self.outputDir, os.path.basename(
211  fname).replace(".root", outpostfix + ".root"))
212  outFile = ROOT.TFile.Open(
213  outFileName, "RECREATE", "", compressionLevel)
214  outFileNames.append(outFileName)
215  if compressionLevel:
216  outFile.SetCompressionAlgorithm(compressionAlgo)
217  # prepare output tree
218  if self.friend:
219  outTree = FriendOutput(inFile, inTree, outFile)
220  else:
221  firstEntry = 0 if fullClone and elist else self.firstEntry
222  outTree = FullOutput(
223  inFile,
224  inTree,
225  outFile,
226  branchSelection=self.branchsel,
227  outputbranchSelection=self.outputbranchsel,
228  fullClone=fullClone,
229  maxEntries=self.maxEntries,
230  firstEntry=firstEntry,
231  jsonFilter=jsonFilter,
232  provenance=self.provenance)
233  else:
234  outFile = None
235  outTree = None
236  if self.branchsel:
237  self.branchsel.selectBranches(inTree)
238 
239  # process events, if needed
240  if not fullClone and not (elist and elist.GetN() == 0):
241  eventRange = range(self.firstEntry, self.firstEntry +
242  nEntries) if nEntries > 0 and not elist else None
243  (nall, npass, timeLoop) = eventLoop(
244  self.modules, inFile, outFile, inTree, outTree,
245  eventRange=eventRange, maxEvents=self.maxEntries
246  )
247  print('Processed %d preselected entries from %s (%s entries). Finally selected %d entries' % (nall, fname, nEntries, npass))
248  elif outTree is not None:
249  nall = nEntries
250  print('Selected %d / %d entries from %s (%.2f%%)' % (outTree.tree().GetEntries(), nall, fname, outTree.tree().GetEntries() / (0.01 * nall) if nall else 0))
251 
252  # now write the output
253  if not self.noOut:
254  outTree.write()
255  outFile.Close()
256  print("Done %s" % outFileName)
257  if self.jobReport:
258  self.jobReport.addInputFile(fname, nall)
259  if self.prefetch:
260  if toBeDeleted:
261  os.unlink(ftoread)
262 
263  for m in self.modules:
264  m.endJob()
265 
266  # close histogram file
267  if self.histFile != None:
268  self.histFile.Close()
269 
270  print("Total time %.1f sec. to process %i events. Rate = %.1f Hz." % ((time.time() - t0), totEntriesRead, totEntriesRead / (time.time() - t0)))
271 
272  if self.haddFileName:
273  haddnano = "./haddnano.py" if os.path.isfile(
274  "./haddnano.py") else "haddnano.py"
275  os.system("%s %s %s" %
276  (haddnano, self.haddFileName, " ".join(outFileNames)))
277  if self.jobReport:
278  self.jobReport.addOutputFile(self.haddFileName)
279  self.jobReport.save()
280 
def replace(string, replacements)
def eventLoop(filename)
Definition: example.py:20
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def preSkim(tree, jsonInput=None, cutstring=None, maxEntries=None, firstEntry=0)
Definition: preskimming.py:59
bool decode(bool &, std::string_view)
Definition: types.cc:72
save
Definition: cuy.py:1164
def InputTree(tree, entrylist=ROOT.MakeNullPointer(ROOT.TEntryList))
T * Get(Args... args)
Definition: Trend.h:122

Member Data Documentation

◆ branchsel

postprocessor.PostProcessor.branchsel

Definition at line 44 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ compression

postprocessor.PostProcessor.compression

Definition at line 29 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ cut

postprocessor.PostProcessor.cut

Definition at line 27 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ firstEntry

postprocessor.PostProcessor.firstEntry

Definition at line 57 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ friend

postprocessor.PostProcessor.friend

Definition at line 33 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ haddFileName

postprocessor.PostProcessor.haddFileName

Definition at line 37 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ histDirName

postprocessor.PostProcessor.histDirName

Definition at line 39 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ histFile

postprocessor.PostProcessor.histFile

Definition at line 38 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ histFileName

postprocessor.PostProcessor.histFileName

Definition at line 53 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ inputFiles

postprocessor.PostProcessor.inputFiles

Definition at line 26 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ jobReport

postprocessor.PostProcessor.jobReport

Definition at line 36 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ json

postprocessor.PostProcessor.json

Definition at line 31 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ justcount

postprocessor.PostProcessor.justcount

Definition at line 34 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ longTermCache

postprocessor.PostProcessor.longTermCache

Definition at line 60 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.prefetchFile().

◆ maxEntries

postprocessor.PostProcessor.maxEntries

Definition at line 56 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ modules

postprocessor.PostProcessor.modules

Definition at line 28 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ noOut

postprocessor.PostProcessor.noOut

Definition at line 32 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ outputbranchsel

postprocessor.PostProcessor.outputbranchsel

Definition at line 46 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ outputDir

postprocessor.PostProcessor.outputDir

Definition at line 25 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ postfix

postprocessor.PostProcessor.postfix

◆ prefetch

postprocessor.PostProcessor.prefetch

Definition at line 58 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().

◆ provenance

postprocessor.PostProcessor.provenance

Definition at line 35 of file postprocessor.py.

Referenced by postprocessor.PostProcessor.run().