CMS 3D CMS Logo

submitPVResolutionJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
4 usage:
5 
6 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
7 '''
8 
9 from __future__ import print_function
10 
11 __author__ = 'Marco Musich'
12 __copyright__ = 'Copyright 2020, CERN CMS'
13 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
14 __license__ = 'Unknown'
15 __maintainer__ = 'Marco Musich'
16 __email__ = 'marco.musich@cern.ch'
17 __version__ = 1
18 
19 import os,sys
20 import getopt
21 import time
22 import json
23 import ROOT
24 import urllib
25 import string
26 import subprocess
27 import pprint
28 import warnings
29 from subprocess import Popen, PIPE
30 import multiprocessing
31 from optparse import OptionParser
32 import os, shlex, shutil, getpass
33 import configparser as ConfigParser
34 
35 CopyRights = '##################################\n'
36 CopyRights += '# submitPVVResolutioJobs.py #\n'
37 CopyRights += '# marco.musich@cern.ch #\n'
38 CopyRights += '# October 2020 #\n'
39 CopyRights += '##################################\n'
40 
41 
42 def get_status_output(*args, **kwargs):
43 
44  p = subprocess.Popen(*args, **kwargs)
45  stdout, stderr = p.communicate()
46  return p.returncode, stdout, stderr
47 
48 
50 
51  """Check if GRID proxy has been initialized."""
52 
53  try:
54  with open(os.devnull, "w") as dump:
55  subprocess.check_call(["voms-proxy-info", "--exists"],
56  stdout = dump, stderr = dump)
57  except subprocess.CalledProcessError:
58  return False
59  return True
60 
61 
62 def forward_proxy(rundir):
63 
64  """Forward proxy to location visible from the batch system.
65  Arguments:
66  - `rundir`: directory for storing the forwarded proxy
67  """
68 
69  if not check_proxy():
70  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
71  sys.exit(1)
72 
73  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
74  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
75 
76 
77 def getFilesForRun(blob):
78 
79  """
80  returns the list of list files associated with a given dataset for a certain run
81  """
82 
83  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
84  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
85  out, err = q.communicate()
86  outputList = out.decode().split('\n')
87  outputList.pop()
88  return outputList
89 
90 
91 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
92 
93  """Writes 'job.submit' file in `path`.
94  Arguments:
95  - `path`: job directory
96  - `script`: script to be executed
97  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
98  """
99 
100  job_submit_template="""\
101 universe = vanilla
102 executable = {script:s}
103 output = {jobm:s}/{out:s}.out
104 error = {jobm:s}/{out:s}.err
105 log = {jobm:s}/{out:s}.log
106 transfer_output_files = ""
107 +JobFlavour = "{flavour:s}"
108 queue {njobs:s}
109 """
110  if proxy_path is not None:
111  job_submit_template += """\
112 +x509userproxy = "{proxy:s}"
113 """
114 
115  job_submit_file = os.path.join(path, "job_"+name+".submit")
116  with open(job_submit_file, "w") as f:
117  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
118  out = name+"_$(ProcId)",
119  jobm = os.path.abspath(path),
120  flavour = "tomorrow",
121  njobs = str(nruns),
122  proxy = proxy_path))
123 
124  return job_submit_file
125 
126 
127 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
128 
129  """Expects something like
130  +-------+------+--------+--------+-------------------+------------------+
131  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
132  +-------+------+--------+--------+-------------------+------------------+
133  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
134  +-------+------+--------+--------+-------------------+------------------+
135  And extracts the total recorded luminosity (/b).
136  """
137  myCachedLumi={}
138  if(not isRunBased):
139  return myCachedLumi
140 
141  try:
142 
144 
145 
146  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
147  except:
148  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
149  return myCachedLumi
150 
151  if(verbose):
152  print("INSIDE GET LUMINOSITY")
153  print(output)
154 
155  for line in output.decode().split("\n"):
156  if ("#" not in line):
157  runToCache = line.split(",")[0].split(":")[0]
158  lumiToCache = line.split(",")[-1].replace("\r", "")
159  #print("run",runToCache)
160  #print("lumi",lumiToCache)
161  myCachedLumi[runToCache] = lumiToCache
162 
163  if(verbose):
164  print(myCachedLumi)
165  return myCachedLumi
166 
167 
168 def isInJSON(run,jsonfile):
169 
170  try:
171  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
172  return (run in jsonDATA)
173  except:
174  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
175  return True
176 
177 
178 def as_dict(config):
180  dictionary = {}
181  for section in config.sections():
182  dictionary[section] = {}
183  for option in config.options(section):
184  dictionary[section][option] = config.get(section, option)
185 
186  return dictionary
187 
188 
189 def batchScriptCERN(theCMSSW_BASE,runindex, eosdir,lumiToRun,key,config):
191  '''prepare the batch script, to run on HTCondor'''
192  script = """#!/bin/bash
193 source /afs/cern.ch/cms/caf/setup.sh
194 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
195 echo "the mother directory is $CMSSW_DIR"
196 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
197 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
198 OUT_DIR={MYDIR}
199 LOG_DIR=$CMSSW_DIR/out
200 LXBATCH_DIR=`pwd`
201 cd $CMSSW_DIR
202 eval `scram runtime -sh`
203 cd $LXBATCH_DIR
204 cp -pr $CMSSW_DIR/cfg/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
205 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
206 ls -lh .
207 #for payloadOutput in $(ls *root ); do cp $payloadOutput $OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
208 for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
209 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
210 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
211 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
212  runindex=runindex,
213  MYDIR=eosdir,
214  KEY=key,
215  LUMITORUN=lumiToRun,
216  GT=config['globaltag'],
217  EXT="external="+config['external'] if 'external' in config.keys() else "",
218  REC="records="+config['records'] if 'records' in config.keys() else "")
219 
220  return script
221 
222 
225 def mkdir_eos(out_path):
226  print("creating",out_path)
227  newpath='/'
228  for dir in out_path.split('/'):
229  newpath=os.path.join(newpath,dir)
230  # do not issue mkdir from very top of the tree
231  if newpath.find('test_out') > 0:
232  command="eos mkdir "+newpath
233  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
234  (out, err) = p.communicate()
235  #print(out,err)
236  p.wait()
237 
238  # now check that the directory exists
239  command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
240  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
241  (out, err) = p.communicate()
242  p.wait()
243  if p.returncode !=0:
244  print(out)
245 
246 
247 def main():
249 
250  desc="""This is a description of %prog."""
251  parser = OptionParser(description=desc,version='%prog version 0.1')
252  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
253  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
254  parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
255  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
256  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
257  parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
258  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
259  parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
260  (opts, args) = parser.parse_args()
262  global CopyRights
263  print('\n'+CopyRights)
264 
265  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
267 
268 
269  USER = os.environ.get('USER')
270  HOME = os.environ.get('HOME')
271  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
272  if opts.submit:
273  mkdir_eos(eosdir)
274  else:
275  print("Not going to create EOS folder. -s option has not been chosen")
276 
277 
278 
279  try:
280  config = ConfigParser.ConfigParser()
281  config.read(opts.iniPathName)
282  except ConfigParser.MissingSectionHeaderError as e:
283  raise WrongIniFormatError(e)
284 
285  print("Parsed the following configuration \n\n")
286  inputDict = as_dict(config)
287  pprint.pprint(inputDict)
288 
289  if(not bool(inputDict)):
290  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
291 
292 
293  forward_proxy(".")
294 
295  #runs = commands.getstatusoutput("dasgoclient -query='run dataset="+opts.DATASET+"'")[1].split("\n")
296  runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
297  runs.pop()
298  runs.sort()
299  print("\n\n Will run on the following runs: \n",runs)
300 
301  if(not os.path.exists("cfg")):
302  os.system("mkdir cfg")
303  os.system("mkdir BASH")
304  os.system("mkdir harvest")
305  os.system("mkdir out")
306 
307  cwd = os.getcwd()
308  bashdir = os.path.join(cwd,"BASH")
310  runs.sort()
311 
312 
313  if(len(runs)==0):
314  if(opts.isUnitTest):
315  print('\n')
316  print('=' * 70)
317  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
318  print('=' * 70)
319  print('\n')
320  sys.exit(0)
321  else:
322  raise Exception('Will not run on any run.... please check again the configuration')
323  else:
324  # get from the DB the int luminosities
325  myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
327  if(opts.verbose):
328  pprint.pprint(myLumiDB)
329 
330  lumimask = inputDict["Input"]["lumimask"]
331  print("\n\n Using JSON file:",lumimask)
332 
333  mytuple=[]
334  print("\n\n First run:",opts.start,"last run:",opts.end)
335 
336  for run in runs:
337  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
338  print("excluding run",run)
339  continue
340 
341  if not isInJSON(run,lumimask):
342  continue
343 
344  else:
345  print("'======> taking run",run)
346  mytuple.append((run,opts.DATASET))
347 
348  #print mytuple
349 
350  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
351  count = pool.map(getFilesForRun,mytuple)
352  file_info = dict(zip(runs, count))
354  if(opts.verbose):
355  print(file_info)
356 
357  count=0
358  for run in runs:
359  count=count+1
360  #if(count>10):
361  # continue
362  #run = run.strip("[").strip("]")
363 
364  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
365  print("excluding",run)
366  continue
367 
368  if not isInJSON(run,lumimask):
369  print("=====> excluding run:",run)
370  continue
371 
372  files = file_info[run]
373  if(opts.verbose):
374  print(run, files)
375  listOfFiles='['
376  for ffile in files:
377  listOfFiles=listOfFiles+"\""+str(ffile)+"\","
378  listOfFiles+="]"
379 
380  #print(listOfFiles)
381 
382  theLumi='1'
383  if (run) in myLumiDB:
384  theLumi = myLumiDB[run]
385  print("run",run," int. lumi:",theLumi,"/pb")
386  else:
387  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
388  theLumi='1'
389  print("run",run," int. lumi:",theLumi,"/pb")
390 
391  # loop on the dictionary
392  for key, value in inputDict.items():
393  #print(key,value)
394  if "Input" in key:
395  continue
396  else:
397  key = key.split(":", 1)[1]
398  print("dealing with",key)
399 
400  os.system("cp "+input_CMSSW_BASE+"/src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py ./cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
401  os.system("sed -i 's|XXX_FILES_XXX|"+listOfFiles+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
402  os.system("sed -i 's|XXX_RUN_XXX|"+run+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
403  os.system("sed -i 's|YYY_KEY_YYY|"+key+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
404 
405  scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
406  scriptFile = open(scriptFileName,'w')
407  scriptFile.write(batchScriptCERN(input_CMSSW_BASE,run,eosdir,theLumi,key,value))
408  scriptFile.close()
409  #os.system('chmod +x %s' % scriptFileName)
410 
411 
412  for key, value in inputDict.items():
413  if "Input" in key:
414  continue
415  else:
416  key = key.split(":", 1)[1]
417 
418  job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
419 
420  if opts.submit:
421  os.system("chmod u+x "+bashdir+"/*.sh")
422  submissionCommand = "condor_submit "+job_submit_file
423  print(submissionCommand)
424  os.system(submissionCommand)
425 
426 
427 if __name__ == "__main__":
428  main()
429 
def get_status_output(args, kwargs)
def batchScriptCERN(theCMSSW_BASE, runindex, eosdir, lumiToRun, key, config)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def replace(string, replacements)
def mkdir_eos(out_path)
method to create recursively directories on EOS
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
bool decode(bool &, std::string const &)
Definition: types.cc:71
Definition: main.py:1
#define str(s)
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)