CMS 3D CMS Logo

submitPVResolutionJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
4 usage:
5 
6 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
7 '''
8 
9 from __future__ import print_function
10 
11 __author__ = 'Marco Musich'
12 __copyright__ = 'Copyright 2020, CERN CMS'
13 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
14 __license__ = 'Unknown'
15 __maintainer__ = 'Marco Musich'
16 __email__ = 'marco.musich@cern.ch'
17 __version__ = 1
18 
19 import os,sys
20 import getopt
21 import time
22 import json
23 import ROOT
24 import urllib
25 import string
26 import subprocess
27 import pprint
28 import warnings
29 from subprocess import Popen, PIPE
30 import multiprocessing
31 from optparse import OptionParser
32 import os, shlex, shutil, getpass
33 import configparser as ConfigParser
34 
35 CopyRights = '##################################\n'
36 CopyRights += '# submitPVVResolutioJobs.py #\n'
37 CopyRights += '# marco.musich@cern.ch #\n'
38 CopyRights += '# October 2020 #\n'
39 CopyRights += '##################################\n'
40 
41 
42 def get_status_output(*args, **kwargs):
43 
44  p = subprocess.Popen(*args, **kwargs)
45  stdout, stderr = p.communicate()
46  return p.returncode, stdout, stderr
47 
48 
50 
51  """Check if GRID proxy has been initialized."""
52 
53  try:
54  with open(os.devnull, "w") as dump:
55  subprocess.check_call(["voms-proxy-info", "--exists"],
56  stdout = dump, stderr = dump)
57  except subprocess.CalledProcessError:
58  return False
59  return True
60 
61 
62 def forward_proxy(rundir):
63 
64  """Forward proxy to location visible from the batch system.
65  Arguments:
66  - `rundir`: directory for storing the forwarded proxy
67  """
68 
69  if not check_proxy():
70  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
71  sys.exit(1)
72 
73  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
74  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
75 
76 
77 def getFilesForRun(blob):
78 
79  """
80  returns the list of list files associated with a given dataset for a certain run
81  """
82  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
83  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
84  out, err = q.communicate()
85  outputList = out.decode().split('\n')
86  outputList.pop()
87  return outputList
88 
89 
90 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
91 
92  """Writes 'job.submit' file in `path`.
93  Arguments:
94  - `path`: job directory
95  - `script`: script to be executed
96  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
97  """
98 
99  job_submit_template="""\
100 universe = vanilla
101 executable = {script:s}
102 output = {jobm:s}/{out:s}.out
103 error = {jobm:s}/{out:s}.err
104 log = {jobm:s}/{out:s}.log
105 transfer_output_files = ""
106 +JobFlavour = "{flavour:s}"
107 queue {njobs:s}
108 """
109  if proxy_path is not None:
110  job_submit_template += """\
111 +x509userproxy = "{proxy:s}"
112 """
113 
114  job_submit_file = os.path.join(path, "job_"+name+".submit")
115  with open(job_submit_file, "w") as f:
116  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
117  out = name+"_$(ProcId)",
118  jobm = os.path.abspath(path),
119  flavour = "tomorrow",
120  njobs = str(nruns),
121  proxy = proxy_path))
122 
123  return job_submit_file
124 
125 
126 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
127 
128  """Expects something like
129  +-------+------+--------+--------+-------------------+------------------+
130  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
131  +-------+------+--------+--------+-------------------+------------------+
132  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
133  +-------+------+--------+--------+-------------------+------------------+
134  And extracts the total recorded luminosity (/b).
135  """
136  myCachedLumi={}
137  if(not isRunBased):
138  return myCachedLumi
139 
140  try:
141 
143 
144 
145  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
146  except:
147  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
148  return myCachedLumi
149 
150  if(verbose):
151  print("INSIDE GET LUMINOSITY")
152  print(output)
153 
154  for line in output.decode().split("\n"):
155  if ("#" not in line):
156  runToCache = line.split(",")[0].split(":")[0]
157  lumiToCache = line.split(",")[-1].replace("\r", "")
158  #print("run",runToCache)
159  #print("lumi",lumiToCache)
160  myCachedLumi[runToCache] = lumiToCache
161 
162  if(verbose):
163  print(myCachedLumi)
164  return myCachedLumi
165 
166 
167 def isInJSON(run,jsonfile):
168 
169  try:
170  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
171  return (run in jsonDATA)
172  except:
173  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
174  return True
175 
176 
177 def as_dict(config):
179  dictionary = {}
180  for section in config.sections():
181  dictionary[section] = {}
182  for option in config.options(section):
183  dictionary[section][option] = config.get(section, option)
184 
185  return dictionary
186 
187 
188 def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False):
190  '''prepare the batch script, to run on HTCondor'''
191  script = """#!/bin/bash
192 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
193 echo "The mother directory is $CMSSW_DIR"
194 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
195 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
196 OUT_DIR={MYDIR}
197 LOG_DIR=$CMSSW_DIR/out
198 LXBATCH_DIR=$PWD
199 # Check if CMSSW environment is set by checking CMSSW_BASE or other variables
200 if [[ -z "$CMSSW_BASE" || -z "$CMSSW_VERSION" || -z "$SCRAM_ARCH" ]]; then
201  echo "CMSSW environment not detected. Sourcing scramv1 runtime..."
202  cd $CMSSW_DIR
203  # Assuming you have a valid CMSSW release environment to source
204  source /cvmfs/cms.cern.ch/cmsset_default.sh
205  eval $(scramv1 runtime -sh) # This sets the CMSSW environment
206 else
207  echo "CMSSW environment is already set. Continuing..."
208 fi
209 cd $LXBATCH_DIR
210 cp -pr {CFGDIR}/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
211 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py TrackCollection={TRKS} GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
212 # Print the contents of the current directory using $PWD and echo
213 echo "Contents of the current directory ($PWD):"
214 echo "$(ls -lh "$PWD")"
215 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
216  CFGDIR=cfgdir,
217  runindex=runindex,
218  MYDIR=eosdir,
219  KEY=key,
220  LUMITORUN=lumiToRun,
221  TRKS=tkCollection,
222  GT=config['globaltag'],
223  EXT="external="+config['external'] if 'external' in config.keys() else "",
224  REC="records="+config['records'] if 'records' in config.keys() else "")
225 
226  if not isUnitTest:
227  script += """for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
228 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
229 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
230 """.format(KEY=key, runindex=runindex)
232  return script
233 
234 
237 def mkdir_eos(out_path):
238  print("creating",out_path)
239  newpath='/'
240  for dir in out_path.split('/'):
241  newpath=os.path.join(newpath,dir)
242  # do not issue mkdir from very top of the tree
243  if newpath.find('test_out') > 0:
244  command="eos mkdir "+newpath
245  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
246  (out, err) = p.communicate()
247  #print(out,err)
248  p.wait()
249 
250  # now check that the directory exists
251  command2="eos ls "+out_path
252  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
253  (out, err) = p.communicate()
254  p.wait()
255  if p.returncode !=0:
256  print(out)
257 
258 
259 def main():
261 
262  desc="""This is a description of %prog."""
263  parser = OptionParser(description=desc,version='%prog version 0.1')
264  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
265  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
266  parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
267  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
268  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
269  parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
270  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
271  parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
272  (opts, args) = parser.parse_args()
274  global CopyRights
275  print('\n'+CopyRights)
276 
277  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
279 
280 
281  USER = os.environ.get('USER')
282  HOME = os.environ.get('HOME')
283  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
284  if opts.submit:
285  mkdir_eos(eosdir)
286  else:
287  print("Not going to create EOS folder. -s option has not been chosen")
288 
289 
290 
291  try:
292  config = ConfigParser.ConfigParser()
293  config.read(opts.iniPathName)
294  except ConfigParser.MissingSectionHeaderError as e:
295  raise WrongIniFormatError(e)
296 
297  print("Parsed the following configuration \n\n")
298  inputDict = as_dict(config)
299  pprint.pprint(inputDict)
300 
301  if(not bool(inputDict)):
302  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
303 
304 
305  forward_proxy(".")
306 
307  #runs = commands.getstatusoutput("dasgoclient -query='run dataset="+opts.DATASET+"'")[1].split("\n")
308  runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
309  runs.pop()
310  runs.sort()
311  print("\n\n Will run on the following runs: \n",runs)
312 
313  # List of directories to create
314  directories = ["cfg", "BASH", "harvest", "out"]
316  for directory in directories:
317  os.makedirs(directory, exist_ok=True)
319  cwd = os.getcwd()
320  bashdir = os.path.join(cwd,"BASH")
321  cfgdir = os.path.join(cwd,"cfg")
323  runs.sort()
324 
325 
326  if(len(runs)==0):
327  if(opts.isUnitTest):
328  print('\n')
329  print('=' * 70)
330  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
331  print('=' * 70)
332  print('\n')
333  sys.exit(0)
334  else:
335  raise Exception('Will not run on any run.... please check again the configuration')
336  else:
337  # get from the DB the int luminosities
338  myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
340  if(opts.verbose):
341  pprint.pprint(myLumiDB)
342 
343  lumimask = inputDict["Input"]["lumimask"]
344  print("\n\n Using JSON file:",lumimask)
345 
346  tkCollection = inputDict["Input"]["trackcollection"]
347  print("\n\n Using trackCollection:", tkCollection)
348 
349  mytuple=[]
350  print("\n\n First run:",opts.start,"last run:",opts.end)
351 
352  for run in runs:
353  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
354  print("excluding run",run)
355  continue
356 
357  if not isInJSON(run,lumimask):
358  continue
359 
360  else:
361  print("'======> taking run",run)
362  mytuple.append((run,opts.DATASET))
363 
364  #print mytuple
365 
366  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
367  count = pool.map(getFilesForRun,mytuple)
369  if(opts.verbose):
370  print("printing count")
371  pprint.pprint(count)
372 
373  # limit the runs in the dictionary to the filtered ones
374  file_info = dict(zip([run for run, _ in mytuple], count))
376  if(opts.verbose):
377  print("printing file_info")
378  pprint.pprint(file_info)
379 
380  count=0
381  for run in runs:
382  #if(count>10):
383  # continue
384  #run = run.strip("[").strip("]")
385 
386  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
387  print("excluding",run)
388  continue
389 
390  if not isInJSON(run,lumimask):
391  print("=====> excluding run:",run)
392  continue
393 
394  count=count+1
395  files = file_info[run]
396  if(opts.verbose):
397  print(run, files)
398  listOfFiles='['
399  for ffile in files:
400  listOfFiles=listOfFiles+"\""+str(ffile)+"\","
401  listOfFiles+="]"
402 
403  #print(listOfFiles)
404 
405  theLumi='1'
406  if (run) in myLumiDB:
407  theLumi = myLumiDB[run]
408  print("run",run," int. lumi:",theLumi,"/pb")
409  else:
410  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
411  theLumi='1'
412  print("run",run," int. lumi:",theLumi,"/pb")
413 
414  # loop on the dictionary
415  for key, value in inputDict.items():
416  #print(key,value)
417  if "Input" in key:
418  continue
419  else:
420  key = key.split(":", 1)[1]
421  print("dealing with",key)
422 
423  # Paths and variables
424  template_file = os.path.join(input_CMSSW_BASE, "src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py")
425  output_file = f"./cfg/PrimaryVertexResolution_{key}_{run}_cfg.py"
427  # Copy the template file to the destination
428  shutil.copy(template_file, output_file)
429 
430  # Read and replace placeholders in the copied file
431  with open(output_file, 'r') as file:
432  content = file.read()
434  # Replace placeholders with actual values
435  content = content.replace("XXX_FILES_XXX", listOfFiles)
436  content = content.replace("XXX_RUN_XXX", run)
437  content = content.replace("YYY_KEY_YYY", key)
438 
439  # Write the modified content back to the file
440  with open(output_file, 'w') as file:
441  file.write(content)
442 
443  scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
444  scriptFile = open(scriptFileName,'w')
445  scriptFile.write(batchScriptCERN(input_CMSSW_BASE,cfgdir,run,eosdir,theLumi,key,value,tkCollection,opts.isUnitTest))
446  scriptFile.close()
447  #os.system('chmod +x %s' % scriptFileName)
448 
449 
450  for key, value in inputDict.items():
451  if "Input" in key:
452  continue
453  else:
454  key = key.split(":", 1)[1]
455 
456  job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
457  os.system("chmod u+x "+bashdir+"/*.sh")
458 
459  if opts.submit:
460  submissionCommand = "condor_submit "+job_submit_file
461  print(submissionCommand)
462  os.system(submissionCommand)
463 
464 
465 if __name__ == "__main__":
466  main()
467 
def get_status_output(args, kwargs)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
Definition: TracksSoA.h:90
def replace(string, replacements)
def mkdir_eos(out_path)
method to create recursively directories on EOS
def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
Definition: main.py:1
bool decode(bool &, std::string_view)
Definition: types.cc:72
#define str(s)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)