CMS 3D CMS Logo

dqmiolumiharvest.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import os
4 import json
5 import ROOT
6 import fnmatch
7 import argparse
8 import subprocess
9 import multiprocessing
10 from collections import defaultdict
11 
12 
13 ROOTPREFIX = "root://cms-xrd-global.cern.ch/"
14 #ROOTPREFIX = "root://eoscms//eos/cms" # for more local files
15 
16 parser = argparse.ArgumentParser(description="Collect MEs for given lumisections from DQMIO data and upload to a DQMGUI. " +
17  "The from-to lumi range will be shown in an artificial run number of form 1xxxxyyyy, while the run number goes into the lumi number field.")
18 
19 parser.add_argument('dataset', help='dataset name, like "/StreamHIExpress/HIRun2018A-Express-v1/DQMIO"')
20 parser.add_argument('-r', '--run', help='Run number of run to process', default=None, type=int)
21 parser.add_argument('-l', '--lumis', help='JSON file with runs/lumisecitons to process (golden JSON format)', default=None)
22 parser.add_argument('-u', '--upload', help='Upload files to this GUI, instead of just creating them. Delete files after upload.', default=None)
23 parser.add_argument('-j', '--njobs', help='Number of threads to read files', type=int, default=1)
24 parser.add_argument('-m', '--me', help='Glob pattern of MEs to load.', default=[], action='append')
25 parser.add_argument('--limit', help='Only load up to LIMIT files', type=int, default=-1)
26 parser.add_argument('--perlumionly', help='Only save MEs that cover exactly one lumisection, and use simplified "run" numbers (10xxxx)', action='store_true')
27 args = parser.parse_args()
28 
29 
30 # we can save a lot of time by only scanning some types, if we know all interesting MEs are of these types.
31 interesting_types = {
32  "TH2Fs",
33  "TH1Fs",
34 # "TH2Ds",
35 # "TH1Ds",
36 # "TH2Ds",
37 # "TProfiles",
38 # "TProfile2Ds",
39 }
40 
41 interesting_mes = args.me
42 if not interesting_mes:
43  print("No --me patterns given. This is fine, but output *will* be empty.")
44 
45 if args.upload and "https:" in args.upload:
46  print("Refuing to upload to production servers, only http upload to local servers allowed.")
47  uploadurl = None
48 else:
49  uploadurl = args.upload
50 
51 def dasquery(dataset):
52  if not dataset.endswith("DQMIO"):
53  raise Exception("This tool probably cannot read the dataset you specified. The name should end with DQMIO.")
54  dasquery = ["dasgoclient", "-query=file dataset=%s" % dataset]
55  print("Querying das ... %s" % dasquery)
56  files = subprocess.check_output(dasquery)
57  files = files.splitlines()
58  print("Got %d files." % len(files))
59  return files
60 
61 files = dasquery(args.dataset)
62 if args.limit > 0: files = files[:args.limit]
63 
64 if args.lumis:
65  with open(args.lumis) as f:
66  j = json.load(f)
67  lumiranges = {int(run): lumis for run, lumis in j.iteritems()}
68 else:
69  if args.run:
70  # let's define no lumis -> full run
71  lumiranges = {args.run : []}
72  else:
73  # ... and similarly, no runs -> everything.
74  lumiranges = {}
75 
76 if args.perlumionly:
77  perlumionly = True
78  def fake_run(lumi, endlumi):
79  return "1%05d" % (lumi)
80 else:
81  perlumionly = False
82  def fake_run(lumi, endlumi):
83  return "1%04d%04d" % (lumi, endlumi)
84 
85 
86 treenames = {
87  0: "Ints",
88  1: "Floats",
89  2: "Strings",
90  3: "TH1Fs",
91  4: "TH1Ss",
92  5: "TH1Ds",
93  6: "TH2Fs",
94  7: "TH2Ss",
95  8: "TH2Ds",
96  9: "TH3Fs",
97  10: "TProfiles",
98  11: "TProfile2Ds",
99 }
100 
101 def check_interesting(mename):
102  for pattern in interesting_mes:
103  if fnmatch.fnmatch(mename, pattern):
104  return True
105 
106 def rangecheck(run, lumi):
107  if not lumiranges: return True
108  if run not in lumiranges: return False
109  lumis = lumiranges[run]
110  if not lumis: return True
111  for start, end in lumis:
112  if lumi >= start and lumi <= end:
113  return True
114  return False
115 
116 def create_dir(parent_dir, name):
117  dir = parent_dir.Get(name)
118  if not dir:
119  dir = parent_dir.mkdir(name)
120  return dir
121 
122 def gotodir(base, path):
123  current = base
124  for directory in path[:-1]:
125  current = create_dir(current, directory)
126  current.cd()
127 
128 
129 def harvestfile(fname):
130  f = ROOT.TFile.Open(ROOTPREFIX + fname)
131  idxtree = getattr(f, "Indices")
132  #idxtree.GetEntry._threaded = True # now the blocking call should release the GIL...
133 
134  # we have no good way to find out which lumis where processed in a job.
135  # so we watch the per-lumi indices and assume that all mentioned lumis
136  # are covered in the end-of-job MEs. This might fail if there are no
137  # per-lumi MEs.
138  knownlumis = set()
139  files = []
140 
141  for i in range(idxtree.GetEntries()):
142  idxtree.GetEntry(i)
143  run, lumi, metype = idxtree.Run, idxtree.Lumi, idxtree.Type
144  if lumi != 0:
145  knownlumis.add(lumi)
146 
147  if not treenames[metype] in interesting_types:
148  continue
149 
150 
151  endrun = run # assume no multi-run files for now
152  if lumi == 0: # per-job ME
153  endlumi = max(knownlumis)
154  lumi = min(knownlumis)
155  else:
156  endlumi = lumi
157 
158  if not (rangecheck(run, lumi) or rangecheck(endrun, endlumi)):
159  continue
160  if perlumionly and lumi != endlumi:
161  continue
162 
163  # we do the saving in here, concurrently with the reading, to avoid
164  # needing to copy/move the TH1's.
165  # doing a round-trip via JSON would probably also work, but this seems
166  # cleaner. For better structure, one could use Generators...
167  # but things need to stay in the same process (from multiprocessing).
168  filename = "DQM_V0001_R%s__perlumiharvested__perlumi%d_%s_v1__DQMIO.root" % (fake_run(lumi, endlumi), run, treenames[metype])
169  prefix = ["DQMData", "Run %s" % fake_run(lumi, endlumi)]
170  # we open the file only on the first found ME, to avoid empty files.
171  result_file = None
172  subsystems = set()
173 
174  # inclusive range -- for 0 entries, row is left out
175  firstidx, lastidx = idxtree.FirstIndex, idxtree.LastIndex
176  metree = getattr(f, treenames[metype])
177  # this GetEntry is only to make sure the TTree is initialized correctly
178  metree.GetEntry(0)
179  metree.SetBranchStatus("*",0)
180  metree.SetBranchStatus("FullName",1)
181 
182  for x in range(firstidx, lastidx+1):
183  metree.GetEntry(x)
184  mename = str(metree.FullName)
185  if check_interesting(mename):
186  metree.GetEntry(x, 1)
187  value = metree.Value
188 
189  # navigate the TDirectory and save the thing again
190  if not result_file:
191  result_file = ROOT.TFile(filename, 'recreate')
192  path = mename.split("/")
193  filepath = prefix + [path[0], "Run summary"] + path[1:]
194  subsystems.add(path[0])
195  gotodir(result_file, filepath)
196  value.Write()
197 
198  # if we found a ME and wrote it to a file, finalize the file here.
199  if result_file:
200  # DQMGUI wants these to show them in the header bar. The folder name
201  # in the TDirectory is also checked and has to match the filename,
202  # but the headerbar can show anything and uses these magic MEs.
203  for subsys in subsystems:
204  # last item is considerd object name and ignored
205  gotodir(result_file, prefix + [subsys, "Run summary", "EventInfo", "blub"])
206  s = ROOT.TObjString("<iRun>i=%s</iRun>" % fake_run(lumi, endlumi))
207  s.Write()
208  s = ROOT.TObjString("<iLumiSection>i=%s</iLumiSection>" % run)
209  s.Write()
210  # we could also set iEvent and runStartTimeStamp if we had values.
211  result_file.Close()
212  files.append(filename)
213 
214  return files
215 
216 def uploadfile(filename):
217  uploadcommand = ["visDQMUpload.py", uploadurl, filename]
218  print("Uploading ... %s" % uploadcommand)
219  subprocess.check_call(uploadcommand)
220 
221 pool = multiprocessing.Pool(processes=args.njobs)
222 ctr = 0
223 for outfiles in pool.imap_unordered(harvestfile, files):
224 #for mes_to_store in map(harvestfile, files):
225  if uploadurl:
226  for f in outfiles:
227  uploadfile(f)
228  os.remove(f)
229  ctr += 1
230  print("Processed %d files of %d, got %d out files...\r" % (ctr, len(files), len(outfiles)), end='')
231 print("\nDone.")
def check_interesting(mename)
def harvestfile(fname)
def create_dir(parent_dir, name)
def dasquery(dataset)
def gotodir(base, path)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def fake_run(lumi, endlumi)
def rangecheck(run, lumi)
#define str(s)
def uploadfile(filename)