CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 from builtins import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
19 import os
20 import sys
21 import glob
22 import shutil
23 import cPickle
24 import subprocess
25 import re
26 import argparse
27 
28 def forward_proxy(rundir):
29  """Forward proxy to location visible from the batch system.
30 
31  Arguments:
32  - `rundir`: directory for storing the forwarded proxy
33  """
34 
35  if not mps_tools.check_proxy():
36  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
37  sys.exit(1)
38 
39  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
40  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
41 
42 
43 def write_HTCondor_submit_file_pede(path, script, config, lib):
44  """Writes 'job.submit' file in `path`.
45 
46  Arguments:
47  - `path`: job directory
48  - `script`: script to be executed
49  - `config`: cfg file
50  - `lib`: MPS lib object
51  """
52 
53  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
54  job_flavour = resources[-1]
55 
56  job_submit_template="""\
57 universe = vanilla
58 executable = {script:s}
59 output = {jobm:s}/STDOUT
60 error = {jobm:s}/STDOUT
61 log = {jobm:s}/HTCJOB
62 notification = Always
63 transfer_output_files = ""
64 request_memory = {pedeMem:d}M
65 
66 # adapted to space used on eos for binaries:
67 request_disk = {disk:d}
68 
69 # adapted to threads parameter in pede options and number of available cores
70 request_cpus = {cpus:d}
71 
72 +JobFlavour = "{flavour:s}"
73 """
74  if "bigmem" in resources:
75  job_submit_template += """\
76 +BigMemJob = True
77 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
78 
79 # automatically remove the job if the submitter has no permissions to run a BigMemJob
80 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
81 """
82  job_submit_template += "\nqueue\n"
83 
84  print("Determine number of pede threads...")
85  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
86  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
87  n_threads = 1
88  for option in pede_options:
89  if "threads" in option:
90  n_threads = option.replace("threads", "").strip()
91  n_threads = max(map(lambda x: int(x), n_threads.split()))
92  break
93  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
94  # cores, i.e. we ensure here that the job
95  # would fit core-wise on one machine
96 
97  print("Determine required disk space on remote host...")
98  # determine usage by each file instead of whole directory as this is what
99  # matters for the specified disk usage:
100  spco = subprocess.check_output # to make code below more less verbose
101  opj = os.path.join # dito
102  cmd = ["du", "--apparent-size"]
103  disk_usage = [int(item.split()[0])
104  for directory in ("binaries", "monitors", "tree_files")
105  for item
106  in spco(cmd+
107  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
108  disk_usage = sum(disk_usage)
109  disk_usage *= 1.1 # reserve 10% additional space
110 
111  job_submit_file = os.path.join(Path, "job.submit")
112  with open(job_submit_file, "w") as f:
113  f.write(job_submit_template.format(script = os.path.abspath(script),
114  jobm = os.path.abspath(path),
115  pedeMem = lib.pedeMem,
116  disk = int(disk_usage),
117  cpus = n_threads,
118  flavour = job_flavour))
119 
120  return job_submit_file
121 
122 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
123  """Writes 'job.submit' file in `path`.
124 
125  Arguments:
126  - `path`: job directory
127  - `script`: script to be executed
128  - `lib`: MPS lib object
129  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
130  """
131 
132  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
133  job_flavour = resources[-1]
134 
135  job_submit_template="""\
136 universe = vanilla
137 executable = {script:s}
138 output = {jobm:s}/STDOUT
139 error = {jobm:s}/STDOUT
140 log = {jobm:s}/HTCJOB
141 notification = Always
142 transfer_output_files = ""
143 
144 +JobFlavour = "{flavour:s}"
145 """
146  if "cafalca" in resources:
147  job_submit_template += """\
148 +CAFJob = True
149 +AccountingGroup = "group_u_CMS.CAF.ALCA"
150 # automatically remove the job if the submitter has no permissions to run a CAF Job
151 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True
152 """
153 
154  if proxy_path is not None:
155  job_submit_template += """\
156 +x509userproxy = "{proxy:s}"
157 """
158  job_submit_template += "\nqueue\n"
159 
160  job_submit_file = os.path.join(Path, "job.submit")
161  with open(job_submit_file, "w") as f:
162  f.write(job_submit_template.format(script = os.path.abspath(script),
163  jobm = os.path.abspath(path),
164  flavour = job_flavour,
165  proxy = proxy_path))
166 
167  return job_submit_file
168 
169 
170 
171 parser = argparse.ArgumentParser(
172  description="Submit jobs that are setup in local mps database to batch system.",
173 )
174 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
175  help="number of Mille jobs to be submitted (default: %(default)d)")
176 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
177  help = ("job IDs to be submitted; "
178  "use either 'job<ID>' or directly '<ID>'"))
179 parser.add_argument("-a", "--all", dest="allMille", default=False,
180  action="store_true",
181  help = ("submit all setup Mille jobs; "
182  "maxJobs and --job-id are ignored"))
183 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
184  action="store_true",
185  help = ("submit all setup Pede jobs; "
186  "maxJobs is ignored, but --job-id is respected"))
187 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
188  action="store_true",
189  help=("force the submission of the Pede job in case some "+
190  "Mille jobs are not in the OK state"))
191 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
192  action="store_true",
193  help=("force the submission of the Pede job in case some "+
194  "Mille jobs are not in the OK state. Unlike --forceMerge "+
195  "this option assumes the user has edited theScript.sh and "+
196  "alignment_merge.py to consistently pick up only the mille "+
197  "output files that exist"))
198 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
199  action="store_true",
200  help="forward VOMS proxy to batch system")
201 args = parser.parse_args(sys.argv[1:])
202 
203 
204 lib = mpslib.jobdatabase()
205 lib.read_db()
206 
207 if args.allMille:
208  # submit all Mille jobs and ignore 'maxJobs' supplied by user
209  args.maxJobs = lib.nJobs
210  args.job_id = None
211 
212 if args.job_id is None:
213  job_mask = lib.JOBDIR
214 else:
215  job_mask = []
216  for job_id in args.job_id:
217  invalid_id = False
218  if job_id.startswith("job"): job_mask.append(job_id)
219  elif job_id.startswith("m"): job_mask.append("job"+job_id)
220  else:
221  try:
222  job_mask.append(lib.JOBDIR[int(job_id)-1])
223  except ValueError:
224  invalid_id = True
225  except IndexError:
226  print("ID provided to '-j/--job-id' is out of range:", job_id)
227  sys.exit(1)
228 
229  if invalid_id or job_mask[-1] not in lib.JOBDIR:
230  print("ID provided to '-j/--job-id' is invalid:", job_id)
231  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
232  print("of the following formats:")
233  print(" - job042")
234  print(" - 042")
235  print(" - jobm1")
236  print(" - m1")
237  sys.exit(1)
238 
239 # build the absolute job directory path (needed by mps_script)
240 theJobData = os.path.join(os.getcwd(), "jobData")
241 
242 # set the job name ???????????????????
243 theJobName = 'mpalign'
244 if lib.addFiles != '':
245  theJobName = lib.addFiles
246 
247 fire_htcondor = False
248 
249 # fire the 'normal' parallel Jobs (Mille Jobs)
250 if not args.fireMerge:
251  #set the resources string coming from mps.db
252  resources = lib.get_class('mille')
253 
254  # "cmscafspec" found in $resources: special cmscaf resources
255  if 'cmscafspec' in resources:
256  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
257  resources = '-q cmscafalcamille'
258  # "cmscaf" found in $resources
259  elif 'cmscaf' in resources:
260  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
261  resources = '-q'+resources+' -m g_cmscaf'
262  elif "htcondor" in resources:
263  fire_htcondor = True
264  else:
265  resources = '-q '+resources
266 
267  nSub = 0 # number of submitted Jobs
268  for i in range(lib.nJobs):
269  if lib.JOBDIR[i] not in job_mask: continue
270  if lib.JOBSTATUS[i] == 'SETUP':
271  if nSub < args.maxJobs:
272  if args.forwardProxy:
273  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
274 
275  # submit a new job with 'bsub -J ...' and check output
276  # for some reasons LSF wants script with full path
277  if fire_htcondor:
278  Path = os.path.join(theJobData,lib.JOBDIR[i])
279  scriptPath = os.path.join(Path, "theScript.sh")
280  if args.forwardProxy:
281  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
282  else:
283  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
284  submission = "condor_submit -batch-name %s %s"%\
285  (theJobName, job_submit_file)
286  else:
287  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
288  (theJobName, resources, theJobData, lib.JOBDIR[i])
289  print(submission)
290  try:
291  result = subprocess.check_output(submission,
292  stderr=subprocess.STDOUT,
293  shell=True)
294  except subprocess.CalledProcessError as e:
295  result = "" # -> check for successful job submission will fail
296  print(' '+result, end=' ')
297  result = result.strip()
298 
299  # check if job was submitted and updating jobdatabase
300  if fire_htcondor:
301  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
302  else:
303  match = re.search('Job <(\d+)> is submitted', result)
304  if match:
305  # need standard format for job number
306  lib.JOBSTATUS[i] = 'SUBTD'
307  lib.JOBID[i] = match.group(1)
308  if fire_htcondor: lib.JOBID[i] += ".0"
309  else:
310  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
311  nSub +=1
312 
313 # fire the merge job
314 else:
315  print('fire merge')
316  # set the resources string coming from mps.db
317  resources = lib.get_class('pede')
318  if 'cmscafspec' in resources:
319  resources = '-q cmscafalcamille'
320  elif "htcondor" in resources:
321  fire_htcondor = True
322  else:
323  resources = '-q '+resources
324 
325  if not fire_htcondor:
326  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
327  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
328 
329  # check whether all other jobs are OK
330  mergeOK = True
331  for i in range(lib.nJobs):
332  if lib.JOBSTATUS[i] != 'OK':
333  if 'DISABLED' not in lib.JOBSTATUS[i]:
334  mergeOK = False
335  break
336 
337  # loop over merge jobs
338  i = lib.nJobs
339  while i<len(lib.JOBDIR):
340  jobNumFrom1 = i+1
341  if lib.JOBDIR[i] not in job_mask:
342  i += 1
343  continue
344 
345  # check if current job in SETUP mode or if forced
346  if lib.JOBSTATUS[i] != 'SETUP':
347  print('Merge job %d status %s not submitted.' % \
348  (jobNumFrom1, lib.JOBSTATUS[i]))
349  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
350  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
351  else:
352  # some paths for clarity
353  Path = os.path.join(theJobData,lib.JOBDIR[i])
354  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
355  scriptPath = os.path.join(Path, "theScript.sh")
356 
357  # force option invoked:
358  if args.forceMerge:
359 
360  # make a backup copy of the script first, if it doesn't already exist.
361  if not os.path.isfile(backupScriptPath):
362  os.system('cp -p '+scriptPath+' '+backupScriptPath)
363 
364  # get the name of merge cfg file -> either the.py or alignment_merge.py
365  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
366  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
367  mergeCfg = mergeCfg.strip()
368 
369  if fire_htcondor:
370  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
371 
372  # make a backup copy of the cfg
373  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
374  cfgPath = os.path.join(Path, mergeCfg)
375  if not os.path.isfile(backupCfgPath):
376  os.system('cp -p '+cfgPath+' '+backupCfgPath)
377 
378  # retrieve weights configuration
379  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
380  weight_conf = cPickle.load(f)
381 
382  # blank weights
383  mps_tools.run_checked(["mps_weight.pl", "-c"])
384 
385  # apply weights
386  for name,weight in weight_conf:
387  print(" ".join(["mps_weight.pl", "-N", name, weight]))
388  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
389 
390  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
391  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
392  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
393  os.system(command)
394 
395  # rewrite theScript.sh using inly 'OK' jobs
396  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
397  os.system(command)
398 
399  else:
400  # restore the backup copy of the script
401  if os.path.isfile(backupScriptPath):
402  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
403 
404  # get the name of merge cfg file
405  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
406  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
407  command = 'basename '+mergeCfg
408  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
409  mergeCfg = mergeCfg.replace('\n','')
410 
411  if fire_htcondor:
412  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
413 
414  # restore the backup copy of the cfg
415  backupCfgPath = Path+'/%s.bak' % mergeCfg
416  cfgPath = Path+'/%s' % mergeCfg
417  if os.path.isfile(backupCfgPath):
418  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
419 
420  # end of if/else forceMerge
421 
422  # submit merge job
423  nMerge = i-lib.nJobs # 'index' of this merge job
424  curJobName = 'm'+str(nMerge)+'_'+theJobName
425  if args.forwardProxy: forward_proxy(Path)
426  if fire_htcondor:
427  submission = ["condor_submit",
428  "-batch-name", curJobName,
429  job_submit_file]
430  else:
431  submission = ["bsub", "-J", curJobName, resources, scriptPath]
432  for _ in range(5):
433  try:
434  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
435  break
436  except subprocess.CalledProcessError as e:
437  result = e.output
438 
439  print(' '+result, end=' ')
440  result = result.strip()
441 
442  # check if merge job was submitted and updating jobdatabase
443  if fire_htcondor:
444  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
445  else:
446  match = re.search('Job <(\d+)> is submitted', result)
447  if match:
448  lib.JOBSTATUS[i] = 'SUBTD'
449  lib.JOBID[i] = match.group(1)
450  # need standard format for job number
451  if fire_htcondor: lib.JOBID[i] += ".0"
452  print("jobid is", lib.JOBID[i])
453  else:
454  print('Submission of merge job seems to have failed:',result, end=' ')
455 
456  i +=1
457  # end of while on merge jobs
458 
459 
460 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:28
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:122
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:43
#define str(s)
double split
Definition: MVATrainer.cc:139