CMS 3D CMS Logo

submitPVResolutionJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
4 usage:
5 
6 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
7 '''
8 
9 from __future__ import print_function
10 
11 __author__ = 'Marco Musich'
12 __copyright__ = 'Copyright 2020, CERN CMS'
13 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
14 __license__ = 'Unknown'
15 __maintainer__ = 'Marco Musich'
16 __email__ = 'marco.musich@cern.ch'
17 __version__ = 1
18 
19 import os,sys
20 import getopt
21 import time
22 import json
23 import ROOT
24 import urllib
25 import string
26 import subprocess
27 import pprint
28 import warnings
29 from subprocess import Popen, PIPE
30 import multiprocessing
31 from optparse import OptionParser
32 import os, shlex, shutil, getpass
33 import configparser as ConfigParser
34 
35 CopyRights = '##################################\n'
36 CopyRights += '# submitPVVResolutioJobs.py #\n'
37 CopyRights += '# marco.musich@cern.ch #\n'
38 CopyRights += '# October 2020 #\n'
39 CopyRights += '##################################\n'
40 
41 
42 def get_status_output(*args, **kwargs):
43 
44  p = subprocess.Popen(*args, **kwargs)
45  stdout, stderr = p.communicate()
46  return p.returncode, stdout, stderr
47 
48 
50 
51  """Check if GRID proxy has been initialized."""
52 
53  try:
54  with open(os.devnull, "w") as dump:
55  subprocess.check_call(["voms-proxy-info", "--exists"],
56  stdout = dump, stderr = dump)
57  except subprocess.CalledProcessError:
58  return False
59  return True
60 
61 
62 def forward_proxy(rundir):
63 
64  """Forward proxy to location visible from the batch system.
65  Arguments:
66  - `rundir`: directory for storing the forwarded proxy
67  """
68 
69  if not check_proxy():
70  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
71  sys.exit(1)
72 
73  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
74  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
75 
76 
77 def getFilesForRun(blob):
78 
79  """
80  returns the list of list files associated with a given dataset for a certain run
81  """
82  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
83  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
84  out, err = q.communicate()
85  outputList = out.decode().split('\n')
86  outputList.pop()
87  return outputList
88 
89 
90 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
91 
92  """Writes 'job.submit' file in `path`.
93  Arguments:
94  - `path`: job directory
95  - `script`: script to be executed
96  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
97  """
98 
99  job_submit_template="""\
100 universe = vanilla
101 executable = {script:s}
102 output = {jobm:s}/{out:s}.out
103 error = {jobm:s}/{out:s}.err
104 log = {jobm:s}/{out:s}.log
105 transfer_output_files = ""
106 +JobFlavour = "{flavour:s}"
107 queue {njobs:s}
108 """
109  if proxy_path is not None:
110  job_submit_template += """\
111 +x509userproxy = "{proxy:s}"
112 """
113 
114  job_submit_file = os.path.join(path, "job_"+name+".submit")
115  with open(job_submit_file, "w") as f:
116  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
117  out = name+"_$(ProcId)",
118  jobm = os.path.abspath(path),
119  flavour = "tomorrow",
120  njobs = str(nruns),
121  proxy = proxy_path))
122 
123  return job_submit_file
124 
125 
126 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
127 
128  """Expects something like
129  +-------+------+--------+--------+-------------------+------------------+
130  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
131  +-------+------+--------+--------+-------------------+------------------+
132  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
133  +-------+------+--------+--------+-------------------+------------------+
134  And extracts the total recorded luminosity (/b).
135  """
136  myCachedLumi={}
137  if(not isRunBased):
138  return myCachedLumi
139 
140  try:
141 
143 
144 
145  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
146  except:
147  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
148  return myCachedLumi
149 
150  if(verbose):
151  print("INSIDE GET LUMINOSITY")
152  print(output)
153 
154  for line in output.decode().split("\n"):
155  if ("#" not in line):
156  runToCache = line.split(",")[0].split(":")[0]
157  lumiToCache = line.split(",")[-1].replace("\r", "")
158  #print("run",runToCache)
159  #print("lumi",lumiToCache)
160  myCachedLumi[runToCache] = lumiToCache
161 
162  if(verbose):
163  print(myCachedLumi)
164  return myCachedLumi
165 
166 
167 def isInJSON(run,jsonfile):
168 
169  try:
170  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
171  return (run in jsonDATA)
172  except:
173  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
174  return True
175 
176 
177 def as_dict(config):
179  dictionary = {}
180  for section in config.sections():
181  dictionary[section] = {}
182  for option in config.options(section):
183  dictionary[section][option] = config.get(section, option)
184 
185  return dictionary
186 
187 
188 def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False):
190  '''prepare the batch script, to run on HTCondor'''
191  script = """#!/bin/bash
192 #source /afs/cern.ch/cms/caf/setup.sh
193 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
194 echo "the mother directory is $CMSSW_DIR"
195 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
196 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
197 OUT_DIR={MYDIR}
198 LOG_DIR=$CMSSW_DIR/out
199 LXBATCH_DIR=`pwd`
200 cd $CMSSW_DIR
201 eval `scram runtime -sh`
202 cd $LXBATCH_DIR
203 cp -pr {CFGDIR}/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
204 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py TrackCollection={TRKS} GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
205 ls -lh .
206 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
207  CFGDIR=cfgdir,
208  runindex=runindex,
209  MYDIR=eosdir,
210  KEY=key,
211  LUMITORUN=lumiToRun,
212  TRKS=tkCollection,
213  GT=config['globaltag'],
214  EXT="external="+config['external'] if 'external' in config.keys() else "",
215  REC="records="+config['records'] if 'records' in config.keys() else "")
216 
217  if not isUnitTest:
218  script += """for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
219 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
220 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
221 """.format(KEY=key, runindex=runindex)
223  return script
224 
225 
228 def mkdir_eos(out_path):
229  print("creating",out_path)
230  newpath='/'
231  for dir in out_path.split('/'):
232  newpath=os.path.join(newpath,dir)
233  # do not issue mkdir from very top of the tree
234  if newpath.find('test_out') > 0:
235  command="eos mkdir "+newpath
236  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
237  (out, err) = p.communicate()
238  #print(out,err)
239  p.wait()
240 
241  # now check that the directory exists
242  command2="eos ls "+out_path
243  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
244  (out, err) = p.communicate()
245  p.wait()
246  if p.returncode !=0:
247  print(out)
248 
249 
250 def main():
252 
253  desc="""This is a description of %prog."""
254  parser = OptionParser(description=desc,version='%prog version 0.1')
255  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
256  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
257  parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
258  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
259  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
260  parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
261  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
262  parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
263  (opts, args) = parser.parse_args()
265  global CopyRights
266  print('\n'+CopyRights)
267 
268  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
270 
271 
272  USER = os.environ.get('USER')
273  HOME = os.environ.get('HOME')
274  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
275  if opts.submit:
276  mkdir_eos(eosdir)
277  else:
278  print("Not going to create EOS folder. -s option has not been chosen")
279 
280 
281 
282  try:
283  config = ConfigParser.ConfigParser()
284  config.read(opts.iniPathName)
285  except ConfigParser.MissingSectionHeaderError as e:
286  raise WrongIniFormatError(e)
287 
288  print("Parsed the following configuration \n\n")
289  inputDict = as_dict(config)
290  pprint.pprint(inputDict)
291 
292  if(not bool(inputDict)):
293  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
294 
295 
296  forward_proxy(".")
297 
298  #runs = commands.getstatusoutput("dasgoclient -query='run dataset="+opts.DATASET+"'")[1].split("\n")
299  runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
300  runs.pop()
301  runs.sort()
302  print("\n\n Will run on the following runs: \n",runs)
303 
304  if(not os.path.exists("cfg")):
305  os.system("mkdir cfg")
306  os.system("mkdir BASH")
307  os.system("mkdir harvest")
308  os.system("mkdir out")
309 
310  cwd = os.getcwd()
311  bashdir = os.path.join(cwd,"BASH")
312  cfgdir = os.path.join(cwd,"cfg")
314  runs.sort()
315 
316 
317  if(len(runs)==0):
318  if(opts.isUnitTest):
319  print('\n')
320  print('=' * 70)
321  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
322  print('=' * 70)
323  print('\n')
324  sys.exit(0)
325  else:
326  raise Exception('Will not run on any run.... please check again the configuration')
327  else:
328  # get from the DB the int luminosities
329  myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
331  if(opts.verbose):
332  pprint.pprint(myLumiDB)
333 
334  lumimask = inputDict["Input"]["lumimask"]
335  print("\n\n Using JSON file:",lumimask)
336 
337  tkCollection = inputDict["Input"]["trackcollection"]
338  print("\n\n Using trackCollection:", tkCollection)
339 
340  mytuple=[]
341  print("\n\n First run:",opts.start,"last run:",opts.end)
342 
343  for run in runs:
344  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
345  print("excluding run",run)
346  continue
347 
348  if not isInJSON(run,lumimask):
349  continue
350 
351  else:
352  print("'======> taking run",run)
353  mytuple.append((run,opts.DATASET))
354 
355  #print mytuple
356 
357  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
358  count = pool.map(getFilesForRun,mytuple)
360  if(opts.verbose):
361  print("printing count")
362  pprint.pprint(count)
363 
364  # limit the runs in the dictionary to the filtered ones
365  file_info = dict(zip([run for run, _ in mytuple], count))
367  if(opts.verbose):
368  print("printing file_info")
369  pprint.pprint(file_info)
370 
371  count=0
372  for run in runs:
373  #if(count>10):
374  # continue
375  #run = run.strip("[").strip("]")
376 
377  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
378  print("excluding",run)
379  continue
380 
381  if not isInJSON(run,lumimask):
382  print("=====> excluding run:",run)
383  continue
384 
385  count=count+1
386  files = file_info[run]
387  if(opts.verbose):
388  print(run, files)
389  listOfFiles='['
390  for ffile in files:
391  listOfFiles=listOfFiles+"\""+str(ffile)+"\","
392  listOfFiles+="]"
393 
394  #print(listOfFiles)
395 
396  theLumi='1'
397  if (run) in myLumiDB:
398  theLumi = myLumiDB[run]
399  print("run",run," int. lumi:",theLumi,"/pb")
400  else:
401  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
402  theLumi='1'
403  print("run",run," int. lumi:",theLumi,"/pb")
404 
405  # loop on the dictionary
406  for key, value in inputDict.items():
407  #print(key,value)
408  if "Input" in key:
409  continue
410  else:
411  key = key.split(":", 1)[1]
412  print("dealing with",key)
413 
414  os.system("cp "+input_CMSSW_BASE+"/src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py ./cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
415  os.system("sed -i 's|XXX_FILES_XXX|"+listOfFiles+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
416  os.system("sed -i 's|XXX_RUN_XXX|"+run+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
417  os.system("sed -i 's|YYY_KEY_YYY|"+key+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
418 
419  scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
420  scriptFile = open(scriptFileName,'w')
421  scriptFile.write(batchScriptCERN(input_CMSSW_BASE,cfgdir,run,eosdir,theLumi,key,value,tkCollection,opts.isUnitTest))
422  scriptFile.close()
423  #os.system('chmod +x %s' % scriptFileName)
424 
425 
426  for key, value in inputDict.items():
427  if "Input" in key:
428  continue
429  else:
430  key = key.split(":", 1)[1]
431 
432  job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
433  os.system("chmod u+x "+bashdir+"/*.sh")
434 
435  if opts.submit:
436  submissionCommand = "condor_submit "+job_submit_file
437  print(submissionCommand)
438  os.system(submissionCommand)
439 
440 
441 if __name__ == "__main__":
442  main()
443 
def get_status_output(args, kwargs)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
Definition: TracksSoA.h:90
def replace(string, replacements)
def mkdir_eos(out_path)
method to create recursively directories on EOS
def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
Definition: main.py:1
bool decode(bool &, std::string_view)
Definition: types.cc:72
#define str(s)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)