CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
submitPVResolutionJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
4 usage:
5 
6 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
7 '''
8 
9 from __future__ import print_function
10 
11 __author__ = 'Marco Musich'
12 __copyright__ = 'Copyright 2020, CERN CMS'
13 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
14 __license__ = 'Unknown'
15 __maintainer__ = 'Marco Musich'
16 __email__ = 'marco.musich@cern.ch'
17 __version__ = 1
18 
19 import os,sys
20 import getopt
21 import time
22 import json
23 import ROOT
24 import urllib
25 import string
26 import subprocess
27 import pprint
28 import warnings
29 from subprocess import Popen, PIPE
30 import multiprocessing
31 from optparse import OptionParser
32 import os, shlex, shutil, getpass
33 import configparser as ConfigParser
34 
35 CopyRights = '##################################\n'
36 CopyRights += '# submitPVVResolutioJobs.py #\n'
37 CopyRights += '# marco.musich@cern.ch #\n'
38 CopyRights += '# October 2020 #\n'
39 CopyRights += '##################################\n'
40 
41 ##############################################
42 def get_status_output(*args, **kwargs):
43 ##############################################
44  p = subprocess.Popen(*args, **kwargs)
45  stdout, stderr = p.communicate()
46  return p.returncode, stdout, stderr
47 
48 ##############################################
50 ##############################################
51  """Check if GRID proxy has been initialized."""
52 
53  try:
54  with open(os.devnull, "w") as dump:
55  subprocess.check_call(["voms-proxy-info", "--exists"],
56  stdout = dump, stderr = dump)
57  except subprocess.CalledProcessError:
58  return False
59  return True
60 
61 ##############################################
62 def forward_proxy(rundir):
63 ##############################################
64  """Forward proxy to location visible from the batch system.
65  Arguments:
66  - `rundir`: directory for storing the forwarded proxy
67  """
68 
69  if not check_proxy():
70  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
71  sys.exit(1)
72 
73  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
74  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
75 
76 ##############################################
77 def getFilesForRun(blob):
78 ##############################################
79  """
80  returns the list of list files associated with a given dataset for a certain run
81  """
82 
83  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
84  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
85  out, err = q.communicate()
86  outputList = out.decode().split('\n')
87  outputList.pop()
88  return outputList
89 
90 ##############################################
91 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
92 ##############################################
93  """Writes 'job.submit' file in `path`.
94  Arguments:
95  - `path`: job directory
96  - `script`: script to be executed
97  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
98  """
99 
100  job_submit_template="""\
101 universe = vanilla
102 executable = {script:s}
103 output = {jobm:s}/{out:s}.out
104 error = {jobm:s}/{out:s}.err
105 log = {jobm:s}/{out:s}.log
106 transfer_output_files = ""
107 +JobFlavour = "{flavour:s}"
108 queue {njobs:s}
109 """
110  if proxy_path is not None:
111  job_submit_template += """\
112 +x509userproxy = "{proxy:s}"
113 """
114 
115  job_submit_file = os.path.join(path, "job_"+name+".submit")
116  with open(job_submit_file, "w") as f:
117  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
118  out = name+"_$(ProcId)",
119  jobm = os.path.abspath(path),
120  flavour = "tomorrow",
121  njobs = str(nruns),
122  proxy = proxy_path))
123 
124  return job_submit_file
125 
126 ##############################################
127 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
128 ##############################################
129  """Expects something like
130  +-------+------+--------+--------+-------------------+------------------+
131  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
132  +-------+------+--------+--------+-------------------+------------------+
133  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
134  +-------+------+--------+--------+-------------------+------------------+
135  And extracts the total recorded luminosity (/b).
136  """
137  myCachedLumi={}
138  if(not isRunBased):
139  return myCachedLumi
140 
141  try:
142  ## using normtag
143  #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag","/cvmfs/cms-bril.cern.ch/cms-lumi-pog/Normtags/normtag_PHYSICS.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
144 
145  ## no normtag
146  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
147  except:
148  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
149  return myCachedLumi
150 
151  if(verbose):
152  print("INSIDE GET LUMINOSITY")
153  print(output)
154 
155  for line in output.decode().split("\n"):
156  if ("#" not in line):
157  runToCache = line.split(",")[0].split(":")[0]
158  lumiToCache = line.split(",")[-1].replace("\r", "")
159  #print("run",runToCache)
160  #print("lumi",lumiToCache)
161  myCachedLumi[runToCache] = lumiToCache
162 
163  if(verbose):
164  print(myCachedLumi)
165  return myCachedLumi
166 
167 ##############################################
168 def isInJSON(run,jsonfile):
169 ##############################################
170  try:
171  with open(jsonfile, 'r') as myJSON:
172  jsonDATA = json.load(myJSON)
173  return (run in jsonDATA)
174  except:
175  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
176  return True
177 
178 #######################################################
179 def as_dict(config):
180 #######################################################
181  dictionary = {}
182  for section in config.sections():
183  dictionary[section] = {}
184  for option in config.options(section):
185  dictionary[section][option] = config.get(section, option)
186 
187  return dictionary
188 
189 #######################################################
190 def batchScriptCERN(theCMSSW_BASE,runindex, eosdir,lumiToRun,key,config):
191 #######################################################
192  '''prepare the batch script, to run on HTCondor'''
193  script = """#!/bin/bash
194 source /afs/cern.ch/cms/caf/setup.sh
195 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
196 echo "the mother directory is $CMSSW_DIR"
197 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
198 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
199 OUT_DIR={MYDIR}
200 LOG_DIR=$CMSSW_DIR/out
201 LXBATCH_DIR=`pwd`
202 cd $CMSSW_DIR
203 eval `scram runtime -sh`
204 cd $LXBATCH_DIR
205 cp -pr $CMSSW_DIR/cfg/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
206 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
207 ls -lh .
208 #for payloadOutput in $(ls *root ); do cp $payloadOutput $OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
209 for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
210 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
211 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
212 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
213  runindex=runindex,
214  MYDIR=eosdir,
215  KEY=key,
216  LUMITORUN=lumiToRun,
217  GT=config['globaltag'],
218  EXT="external="+config['external'] if 'external' in config.keys() else "",
219  REC="records="+config['records'] if 'records' in config.keys() else "")
220 
221  return script
222 
223 #######################################################
224 # method to create recursively directories on EOS
225 #######################################################
226 def mkdir_eos(out_path):
227  print("creating",out_path)
228  newpath='/'
229  for dir in out_path.split('/'):
230  newpath=os.path.join(newpath,dir)
231  # do not issue mkdir from very top of the tree
232  if newpath.find('test_out') > 0:
233  command="eos mkdir "+newpath
234  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
235  (out, err) = p.communicate()
236  #print(out,err)
237  p.wait()
238 
239  # now check that the directory exists
240  command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
241  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
242  (out, err) = p.communicate()
243  p.wait()
244  if p.returncode !=0:
245  print(out)
246 
247 ##############################################
248 def main():
249 ##############################################
250 
251  desc="""This is a description of %prog."""
252  parser = OptionParser(description=desc,version='%prog version 0.1')
253  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
254  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
255  parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
256  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
257  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
258  parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
259  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
260  parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
261  (opts, args) = parser.parse_args()
262 
263  global CopyRights
264  print('\n'+CopyRights)
265 
266  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
267 
268  ## prepare the eos output directory
269 
270  USER = os.environ.get('USER')
271  HOME = os.environ.get('HOME')
272  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
273  if opts.submit:
274  mkdir_eos(eosdir)
275  else:
276  print("Not going to create EOS folder. -s option has not been chosen")
277 
278  ## parse the configuration file
279 
280  try:
281  config = ConfigParser.ConfigParser()
282  config.read(opts.iniPathName)
283  except ConfigParser.MissingSectionHeaderError as e:
284  raise WrongIniFormatError(e)
285 
286  print("Parsed the following configuration \n\n")
287  inputDict = as_dict(config)
288  pprint.pprint(inputDict)
289 
290  if(not bool(inputDict)):
291  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
292 
293  ## check first there is a valid grid proxy
294  forward_proxy(".")
295 
296  #runs = commands.getstatusoutput("dasgoclient -query='run dataset="+opts.DATASET+"'")[1].split("\n")
297  runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
298  runs.pop()
299  runs.sort()
300  print("\n\n Will run on the following runs: \n",runs)
301 
302  if(not os.path.exists("cfg")):
303  os.system("mkdir cfg")
304  os.system("mkdir BASH")
305  os.system("mkdir harvest")
306  os.system("mkdir out")
307 
308  cwd = os.getcwd()
309  bashdir = os.path.join(cwd,"BASH")
310 
311  runs.sort()
312 
313  ## check that the list of runs is not empty
314  if(len(runs)==0):
315  if(opts.isUnitTest):
316  print('\n')
317  print('=' * 70)
318  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
319  print('=' * 70)
320  print('\n')
321  sys.exit(0)
322  else:
323  raise Exception('Will not run on any run.... please check again the configuration')
324  else:
325  # get from the DB the int luminosities
326  myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
327 
328  if(opts.verbose):
329  pprint.pprint(myLumiDB)
330 
331  lumimask = inputDict["Input"]["lumimask"]
332  print("\n\n Using JSON file:",lumimask)
333 
334  mytuple=[]
335  print("\n\n First run:",opts.start,"last run:",opts.end)
336 
337  for run in runs:
338  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
339  print("excluding run",run)
340  continue
341 
342  if not isInJSON(run,lumimask):
343  continue
344 
345  else:
346  print("'======> taking run",run)
347  mytuple.append((run,opts.DATASET))
348 
349  #print mytuple
350 
351  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
352  count = pool.map(getFilesForRun,mytuple)
353  file_info = dict(zip(runs, count))
354 
355  if(opts.verbose):
356  print(file_info)
357 
358  count=0
359  for run in runs:
360  count=count+1
361  #if(count>10):
362  # continue
363  #run = run.strip("[").strip("]")
364 
365  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
366  print("excluding",run)
367  continue
368 
369  if not isInJSON(run,lumimask):
370  print("=====> excluding run:",run)
371  continue
372 
373  files = file_info[run]
374  if(opts.verbose):
375  print(run, files)
376  listOfFiles='['
377  for ffile in files:
378  listOfFiles=listOfFiles+"\""+str(ffile)+"\","
379  listOfFiles+="]"
380 
381  #print(listOfFiles)
382 
383  theLumi='1'
384  if (run) in myLumiDB:
385  theLumi = myLumiDB[run]
386  print("run",run," int. lumi:",theLumi,"/pb")
387  else:
388  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
389  theLumi='1'
390  print("run",run," int. lumi:",theLumi,"/pb")
391 
392  # loop on the dictionary
393  for key, value in inputDict.items():
394  #print(key,value)
395  if "Input" in key:
396  continue
397  else:
398  key = key.split(":", 1)[1]
399  print("dealing with",key)
400 
401  os.system("cp "+input_CMSSW_BASE+"/src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py ./cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
402  os.system("sed -i 's|XXX_FILES_XXX|"+listOfFiles+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
403  os.system("sed -i 's|XXX_RUN_XXX|"+run+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
404  os.system("sed -i 's|YYY_KEY_YYY|"+key+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
405 
406  scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
407  scriptFile = open(scriptFileName,'w')
408  scriptFile.write(batchScriptCERN(input_CMSSW_BASE,run,eosdir,theLumi,key,value))
409  scriptFile.close()
410  #os.system('chmod +x %s' % scriptFileName)
411 
412  ## prepare the HTCondor submission files and eventually submit them
413  for key, value in inputDict.items():
414  if "Input" in key:
415  continue
416  else:
417  key = key.split(":", 1)[1]
418 
419  job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
420 
421  if opts.submit:
422  os.system("chmod u+x "+bashdir+"/*.sh")
423  submissionCommand = "condor_submit "+job_submit_file
424  print(submissionCommand)
425  os.system(submissionCommand)
426 
427 ###################################################
428 if __name__ == "__main__":
429  main()
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
if(conf_.getParameter< bool >("UseStripCablingDB"))
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
bool decode(bool &, std::string const &)
Definition: types.cc:71
Definition: main.py:1
def mkdir_eos
method to create recursively directories on EOS
#define str(s)