CMS 3D CMS Logo

das-up-to-nevents.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 import pycurl
3 from io import BytesIO
4 import pycurl
5 import ast
6 import subprocess
7 import pandas as pd
8 import argparse
9 from bs4 import BeautifulSoup
10 import numpy as np
11 import os
12 import json
13 import sys
14 
15 
16 base_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification/"
17 base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/"
18 
19 def get_url_clean(url):
20 
21  buffer = BytesIO()
22  c = pycurl.Curl()
23  c.setopt(c.URL, url)
24  c.setopt(c.WRITEDATA, buffer)
25  c.perform()
26  c.close()
27 
28  return BeautifulSoup(buffer.getvalue(), "lxml").text
29 
30 def das_do_command(cmd):
31  out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8')
32  return out.split("\n")
33 
34 def das_file_site(dataset, site):
35  cmd = "dasgoclient --query='file dataset=%s site=%s'"%(dataset,site)
36  out = das_do_command(cmd)
37  df = pd.DataFrame(out,columns=["file"])
38 
39  return df
40 
41 def das_file_data(dataset,opt=""):
42  cmd = "dasgoclient --query='file dataset=%s %s| grep file.name, file.nevents'"%(dataset,opt)
43  out = das_do_command(cmd)
44  out = [np.array(r.split(" "))[[0,3]] for r in out if len(r) > 0]
45 
46  df = pd.DataFrame(out,columns=["file","events"])
47  df.events = df.events.values.astype(int)
48 
49  return df
50 
51 def das_lumi_data(dataset,opt=""):
52  cmd = "dasgoclient --query='file,lumi,run dataset=%s %s'"%(dataset,opt)
53 
54  out = das_do_command(cmd)
55  out = [r.split(" ") for r in out if len(r)>0]
56 
57  df = pd.DataFrame(out,columns=["file","run","lumis"])
58 
59  return df
60 
61 def das_run_events_data(dataset,run,opt=""):
62  cmd = "dasgoclient --query='file dataset=%s run=%s %s | sum(file.nevents) '"%(dataset,run,opt)
63  out = das_do_command(cmd)[0]
64 
65  out = [o for o in out.split(" ") if "sum" not in o]
66  out = int([r.split(" ") for r in out if len(r)>0][0][0])
67 
68  return out
69 
70 def das_run_data(dataset,opt=""):
71  cmd = "dasgoclient --query='run dataset=%s %s '"%(dataset,opt)
72  out = das_do_command(cmd)
73 
74  return out
75 
77  print("No intersection between:")
78  print(" - json : ", best_json)
79  print(" - dataset: ", dataset)
80  print("Exiting.")
81  sys.exit(1)
82 
83 if __name__ == '__main__':
84 
85  parser = argparse.ArgumentParser()
86  parser.add_argument('--dataset','-d', default=None, help="Dataset Name (e.g. '/DisplacedJet/Run2024C-v1/RAW' )",type=str,required=True)
87  parser.add_argument('--threshold','-t', help ="Event threshold per file",type=int,default=-1)
88  parser.add_argument('--events','-e', help ="Tot number of events targeted",type=int,default=-1)
89  parser.add_argument('--outfile','-o', help='Dump results to file', type=str, default=None)
90  parser.add_argument('--pandas', '-pd',action='store_true',help="Store the whole dataset (no event or threshold cut) in a csv")
91  parser.add_argument('--proxy','-p', help='Allow to parse a x509 proxy if needed', type=str, default=None)
92  parser.add_argument('--site','-s', help='Only data at specific site', type=str, default=None)
93  parser.add_argument('--precheck','-pc', action='store_true', help='Check run per run before building the dataframes, to avoid huge caching.')
94  args = parser.parse_args()
95 
96  if args.proxy is not None:
97  os.environ["X509_USER_PROXY"] = args.proxy
98  elif "X509_USER_PROXY" not in os.environ:
99  print("No X509 proxy set. Exiting.")
100  sys.exit(1)
101 
102 
103  testing = "JENKINS_PREFIX" in os.environ
104  dataset = args.dataset
105  events = args.events
106  threshold = args.threshold
107  outfile = args.outfile
108  site = args.site
109 
110 
111  year = dataset.split("Run")[1][2:4] # from 20XX to XX
112  PD = dataset.split("/")[1]
113  cert_type = "Collisions" + str(year)
114  if "Cosmics" in dataset:
115  cert_type = "Cosmics" + str(year)
116  elif "Commisioning" in dataset:
117  cert_type = "Commisioning2020"
118  elif "HI" in PD:
119  cert_type = "Collisions" + str(year) + "HI"
120 
121  cert_path = base_cert_path + cert_type + "/"
122  web_fallback = False
123 
124 
125  if os.path.isdir(cert_path):
126  json_list = os.listdir(cert_path)
127  if len(json_list) == 0:
128  web_fallback == True
129  json_list = [c for c in json_list if "Golden" in c and "era" not in c]
130  json_list = [c for c in json_list if c.startswith("Cert_C") and c.endswith("json")]
131  else:
132  web_fallback = True
133 
134  if web_fallback:
135  cert_url = base_cert_url + cert_type + "/"
136  json_list = get_url_clean(cert_url).split("\n")
137  json_list = [c for c in json_list if "Golden" in c and "era" not in c and "Cert_C" in c]
138  json_list = [[cc for cc in c.split(" ") if cc.startswith("Cert_C") and cc.endswith("json")][0] for c in json_list]
139 
140  # the larger the better, assuming file naming schema
141  # Cert_X_RunStart_RunFinish_Type.json
142  run_ranges = [int(c.split("_")[3]) - int(c.split("_")[2]) for c in json_list]
143  latest_json = np.array(json_list[np.argmax(run_ranges)]).reshape(1,-1)[0].astype(str)
144  best_json = str(latest_json[0])
145  if not web_fallback:
146  with open(cert_path + "/" + best_json) as js:
147  golden = json.load(js)
148  else:
149  golden = get_url_clean(cert_url + best_json)
150  golden = ast.literal_eval(golden) #converts string to dict
151 
152  # golden json with all the lumisections
153  golden_flat = {}
154  for k in golden:
155  R = []
156  for r in golden[k]:
157  R = R + [f for f in range(r[0],r[1]+1)]
158  golden_flat[k] = R
159 
160  # let's just check there's an intersection between the
161  # dataset and the json
162  data_runs = das_run_data(dataset)
163  golden_data_runs = [r for r in data_runs if r in golden_flat]
164 
165  if (len(golden_data_runs)==0):
167 
168  # building the dataframe, cleaning for bad lumis
169  golden_data_runs_tocheck = golden_data_runs
170  das_opt = ""
171  if testing or args.precheck:
172  golden_data_runs_tocheck = []
173  # Here we check run per run.
174  # This implies more dasgoclient queries, but smaller outputs
175  # useful when running the IB/PR tests not to have huge
176  # query results that have to be cached.
177 
178  sum_events = 0
179 
180  for r in golden_data_runs:
181  sum_events = sum_events + int(das_run_events_data(dataset,r))
182  golden_data_runs_tocheck.append(r)
183  if events > 0 and sum_events > events:
184  break
185 
186  das_opt = "run in %s"%(str([int(g) for g in golden_data_runs_tocheck]))
187 
188  df = das_lumi_data(dataset,opt=das_opt).merge(das_file_data(dataset,opt=das_opt),on="file",how="inner") # merge file informations with run and lumis
189 
190  df["lumis"] = [[int(ff) for ff in f.replace("[","").replace("]","").split(",")] for f in df.lumis.values]
191  df_rs = []
192  for r in golden_data_runs_tocheck:
193  cut = (df["run"] == r)
194  if not any(cut):
195  continue
196 
197  df_r = df[cut]
198 
199  # jumping low event content runs
200  if df_r["events"].sum() < threshold:
201  continue
202 
203  good_lumis = np.array([len([ll for ll in l if ll in golden_flat[r]]) for l in df_r.lumis])
204  n_lumis = np.array([len(l) for l in df_r.lumis])
205  df_rs.append(df_r[good_lumis==n_lumis])
206 
207  if (len(df_rs)==0):
209 
210  df = pd.concat(df_rs)
211  df.loc[:,"min_lumi"] = [min(f) for f in df.lumis]
212  df.loc[:,"max_lumi"] = [max(f) for f in df.lumis]
213  df = df.sort_values(["run","min_lumi","max_lumi"])
214 
215  if site is not None:
216  df = df.merge(das_file_site(dataset,site),on="file",how="inner")
217 
218  if args.pandas:
219  df.to_csv(dataset.replace("/","")+".csv")
220 
221  if events > 0:
222  df = df[df["events"] <= events] #jump too big files
223  df.loc[:,"sum_evs"] = df.loc[:,"events"].cumsum()
224  df = df[df["sum_evs"] < events]
225 
226  files = df.file
227 
228  if outfile is not None:
229  with open(outfile, 'w') as f:
230  for line in files:
231  f.write(f"{line}\n")
232  else:
233  print("\n".join(files))
234 
235  sys.exit(0)
236 
237 
Definition: merge.py:1
bool any(const std::vector< T > &v, const T &what)
Definition: ECalSD.cc:37
def das_file_site(dataset, site)
def replace(string, replacements)
def das_run_events_data(dataset, run, opt="")
def das_lumi_data(dataset, opt="")
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
bool decode(bool &, std::string_view)
Definition: types.cc:72
def das_run_data(dataset, opt="")
def das_file_data(dataset, opt="")