CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 from builtins import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
19 import os
20 import sys
21 import glob
22 import shutil
23 import cPickle
24 import subprocess
25 import re
26 import argparse
27 
28 def forward_proxy(rundir):
29  """Forward proxy to location visible from the batch system.
30 
31  Arguments:
32  - `rundir`: directory for storing the forwarded proxy
33  """
34 
35  if not mps_tools.check_proxy():
36  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
37  sys.exit(1)
38 
39  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
40  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
41 
42 
43 def write_HTCondor_submit_file_pede(path, script, config, lib):
44  """Writes 'job.submit' file in `path`.
45 
46  Arguments:
47  - `path`: job directory
48  - `script`: script to be executed
49  - `config`: cfg file
50  - `lib`: MPS lib object
51  """
52 
53  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
54  job_flavour = resources[-1]
55 
56  job_submit_template="""\
57 universe = vanilla
58 executable = {script:s}
59 output = {jobm:s}/STDOUT
60 error = {jobm:s}/STDOUT
61 log = {jobm:s}/HTCJOB
62 notification = Always
63 transfer_output_files = ""
64 request_memory = {pedeMem:d}M
65 
66 # adapted to space used on eos for binaries:
67 request_disk = {disk:d}
68 
69 # adapted to threads parameter in pede options and number of available cores
70 request_cpus = {cpus:d}
71 
72 +JobFlavour = "{flavour:s}"
73 """
74  if "bigmem" in resources:
75  job_submit_template += """\
76 +BigMemJob = True
77 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
78 
79 # automatically remove the job if the submitter has no permissions to run a BigMemJob
80 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
81 """
82  job_submit_template += "\nqueue\n"
83 
84  print("Determine number of pede threads...")
85  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
86  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
87  n_threads = 1
88  for option in pede_options:
89  if "threads" in option:
90  n_threads = option.replace("threads", "").strip()
91  n_threads = max(map(lambda x: int(x), n_threads.split()))
92  break
93  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
94  # cores, i.e. we ensure here that the job
95  # would fit core-wise on one machine
96 
97  print("Determine required disk space on remote host...")
98  # determine usage by each file instead of whole directory as this is what
99  # matters for the specified disk usage:
100  spco = subprocess.check_output # to make code below more less verbose
101  opj = os.path.join # dito
102  cmd = ["du", "--apparent-size"]
103  disk_usage = [int(item.split()[0])
104  for directory in ("binaries", "monitors", "tree_files")
105  for item
106  in spco(cmd+
107  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
108  disk_usage = sum(disk_usage)
109  disk_usage *= 1.1 # reserve 10% additional space
110 
111  job_submit_file = os.path.join(Path, "job.submit")
112  with open(job_submit_file, "w") as f:
113  f.write(job_submit_template.format(script = os.path.abspath(script),
114  jobm = os.path.abspath(path),
115  pedeMem = lib.pedeMem,
116  disk = int(disk_usage),
117  cpus = n_threads,
118  flavour = job_flavour))
119 
120  return job_submit_file
121 
122 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
123  """Writes 'job.submit' file in `path`.
124 
125  Arguments:
126  - `path`: job directory
127  - `script`: script to be executed
128  - `lib`: MPS lib object
129  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
130  """
131 
132  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
133  job_flavour = resources[-1]
134 
135  job_submit_template="""\
136 universe = vanilla
137 executable = {script:s}
138 output = {jobm:s}/STDOUT
139 error = {jobm:s}/STDOUT
140 log = {jobm:s}/HTCJOB
141 notification = Always
142 transfer_output_files = ""
143 
144 +JobFlavour = "{flavour:s}"
145 """
146  if proxy_path is not None:
147  job_submit_template += """\
148 +x509userproxy = "{proxy:s}"
149 """
150  job_submit_template += "\nqueue\n"
151 
152  job_submit_file = os.path.join(Path, "job.submit")
153  with open(job_submit_file, "w") as f:
154  f.write(job_submit_template.format(script = os.path.abspath(script),
155  jobm = os.path.abspath(path),
156  flavour = job_flavour,
157  proxy = proxy_path))
158 
159  return job_submit_file
160 
161 
162 
163 parser = argparse.ArgumentParser(
164  description="Submit jobs that are setup in local mps database to batch system.",
165 )
166 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
167  help="number of Mille jobs to be submitted (default: %(default)d)")
168 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
169  help = ("job IDs to be submitted; "
170  "use either 'job<ID>' or directly '<ID>'"))
171 parser.add_argument("-a", "--all", dest="allMille", default=False,
172  action="store_true",
173  help = ("submit all setup Mille jobs; "
174  "maxJobs and --job-id are ignored"))
175 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
176  action="store_true",
177  help = ("submit all setup Pede jobs; "
178  "maxJobs is ignored, but --job-id is respected"))
179 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
180  action="store_true",
181  help=("force the submission of the Pede job in case some "+
182  "Mille jobs are not in the OK state"))
183 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
184  action="store_true",
185  help=("force the submission of the Pede job in case some "+
186  "Mille jobs are not in the OK state. Unlike --forceMerge "+
187  "this option assumes the user has edited theScript.sh and "+
188  "alignment_merge.py to consistently pick up only the mille "+
189  "output files that exist"))
190 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
191  action="store_true",
192  help="forward VOMS proxy to batch system")
193 args = parser.parse_args(sys.argv[1:])
194 
195 
196 lib = mpslib.jobdatabase()
197 lib.read_db()
198 
199 if args.allMille:
200  # submit all Mille jobs and ignore 'maxJobs' supplied by user
201  args.maxJobs = lib.nJobs
202  args.job_id = None
203 
204 if args.job_id is None:
205  job_mask = lib.JOBDIR
206 else:
207  job_mask = []
208  for job_id in args.job_id:
209  invalid_id = False
210  if job_id.startswith("job"): job_mask.append(job_id)
211  elif job_id.startswith("m"): job_mask.append("job"+job_id)
212  else:
213  try:
214  job_mask.append(lib.JOBDIR[int(job_id)-1])
215  except ValueError:
216  invalid_id = True
217  except IndexError:
218  print("ID provided to '-j/--job-id' is out of range:", job_id)
219  sys.exit(1)
220 
221  if invalid_id or job_mask[-1] not in lib.JOBDIR:
222  print("ID provided to '-j/--job-id' is invalid:", job_id)
223  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
224  print("of the following formats:")
225  print(" - job042")
226  print(" - 042")
227  print(" - jobm1")
228  print(" - m1")
229  sys.exit(1)
230 
231 # build the absolute job directory path (needed by mps_script)
232 theJobData = os.path.join(os.getcwd(), "jobData")
233 
234 # set the job name ???????????????????
235 theJobName = 'mpalign'
236 if lib.addFiles != '':
237  theJobName = lib.addFiles
238 
239 fire_htcondor = False
240 
241 # fire the 'normal' parallel Jobs (Mille Jobs)
242 if not args.fireMerge:
243  #set the resources string coming from mps.db
244  resources = lib.get_class('mille')
245 
246  # "cmscafspec" found in $resources: special cmscaf resources
247  if 'cmscafspec' in resources:
248  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
249  resources = '-q cmscafalcamille'
250  # "cmscaf" found in $resources
251  elif 'cmscaf' in resources:
252  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
253  resources = '-q'+resources+' -m g_cmscaf'
254  elif "htcondor" in resources:
255  fire_htcondor = True
256  else:
257  resources = '-q '+resources
258 
259  nSub = 0 # number of submitted Jobs
260  for i in range(lib.nJobs):
261  if lib.JOBDIR[i] not in job_mask: continue
262  if lib.JOBSTATUS[i] == 'SETUP':
263  if nSub < args.maxJobs:
264  if args.forwardProxy:
265  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
266 
267  # submit a new job with 'bsub -J ...' and check output
268  # for some reasons LSF wants script with full path
269  if fire_htcondor:
270  Path = os.path.join(theJobData,lib.JOBDIR[i])
271  scriptPath = os.path.join(Path, "theScript.sh")
272  if args.forwardProxy:
273  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
274  else:
275  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
276  submission = "condor_submit -batch-name %s %s"%\
277  (theJobName, job_submit_file)
278  else:
279  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
280  (theJobName, resources, theJobData, lib.JOBDIR[i])
281  print(submission)
282  try:
283  result = subprocess.check_output(submission,
284  stderr=subprocess.STDOUT,
285  shell=True)
286  except subprocess.CalledProcessError as e:
287  result = "" # -> check for successful job submission will fail
288  print(' '+result, end=' ')
289  result = result.strip()
290 
291  # check if job was submitted and updating jobdatabase
292  if fire_htcondor:
293  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
294  else:
295  match = re.search('Job <(\d+)> is submitted', result)
296  if match:
297  # need standard format for job number
298  lib.JOBSTATUS[i] = 'SUBTD'
299  lib.JOBID[i] = match.group(1)
300  if fire_htcondor: lib.JOBID[i] += ".0"
301  else:
302  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
303  nSub +=1
304 
305 # fire the merge job
306 else:
307  print('fire merge')
308  # set the resources string coming from mps.db
309  resources = lib.get_class('pede')
310  if 'cmscafspec' in resources:
311  resources = '-q cmscafalcamille'
312  elif "htcondor" in resources:
313  fire_htcondor = True
314  else:
315  resources = '-q '+resources
316 
317  if not fire_htcondor:
318  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
319  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
320 
321  # check whether all other jobs are OK
322  mergeOK = True
323  for i in range(lib.nJobs):
324  if lib.JOBSTATUS[i] != 'OK':
325  if 'DISABLED' not in lib.JOBSTATUS[i]:
326  mergeOK = False
327  break
328 
329  # loop over merge jobs
330  i = lib.nJobs
331  while i<len(lib.JOBDIR):
332  jobNumFrom1 = i+1
333  if lib.JOBDIR[i] not in job_mask:
334  i += 1
335  continue
336 
337  # check if current job in SETUP mode or if forced
338  if lib.JOBSTATUS[i] != 'SETUP':
339  print('Merge job %d status %s not submitted.' % \
340  (jobNumFrom1, lib.JOBSTATUS[i]))
341  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
342  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
343  else:
344  # some paths for clarity
345  Path = os.path.join(theJobData,lib.JOBDIR[i])
346  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
347  scriptPath = os.path.join(Path, "theScript.sh")
348 
349  # force option invoked:
350  if args.forceMerge:
351 
352  # make a backup copy of the script first, if it doesn't already exist.
353  if not os.path.isfile(backupScriptPath):
354  os.system('cp -p '+scriptPath+' '+backupScriptPath)
355 
356  # get the name of merge cfg file -> either the.py or alignment_merge.py
357  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
358  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
359  mergeCfg = mergeCfg.strip()
360 
361  if fire_htcondor:
362  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
363 
364  # make a backup copy of the cfg
365  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
366  cfgPath = os.path.join(Path, mergeCfg)
367  if not os.path.isfile(backupCfgPath):
368  os.system('cp -p '+cfgPath+' '+backupCfgPath)
369 
370  # retrieve weights configuration
371  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
372  weight_conf = cPickle.load(f)
373 
374  # blank weights
375  mps_tools.run_checked(["mps_weight.pl", "-c"])
376 
377  # apply weights
378  for name,weight in weight_conf:
379  print(" ".join(["mps_weight.pl", "-N", name, weight]))
380  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
381 
382  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
383  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
384  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
385  os.system(command)
386 
387  # rewrite theScript.sh using inly 'OK' jobs
388  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
389  os.system(command)
390 
391  else:
392  # restore the backup copy of the script
393  if os.path.isfile(backupScriptPath):
394  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
395 
396  # get the name of merge cfg file
397  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
398  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
399  command = 'basename '+mergeCfg
400  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
401  mergeCfg = mergeCfg.replace('\n','')
402 
403  if fire_htcondor:
404  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
405 
406  # restore the backup copy of the cfg
407  backupCfgPath = Path+'/%s.bak' % mergeCfg
408  cfgPath = Path+'/%s' % mergeCfg
409  if os.path.isfile(backupCfgPath):
410  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
411 
412  # end of if/else forceMerge
413 
414  # submit merge job
415  nMerge = i-lib.nJobs # 'index' of this merge job
416  curJobName = 'm'+str(nMerge)+'_'+theJobName
417  if args.forwardProxy: forward_proxy(Path)
418  if fire_htcondor:
419  submission = ["condor_submit",
420  "-batch-name", curJobName,
421  job_submit_file]
422  else:
423  submission = ["bsub", "-J", curJobName, resources, scriptPath]
424  for _ in range(5):
425  try:
426  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
427  break
428  except subprocess.CalledProcessError as e:
429  result = e.output
430 
431  print(' '+result, end=' ')
432  result = result.strip()
433 
434  # check if merge job was submitted and updating jobdatabase
435  if fire_htcondor:
436  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
437  else:
438  match = re.search('Job <(\d+)> is submitted', result)
439  if match:
440  lib.JOBSTATUS[i] = 'SUBTD'
441  lib.JOBID[i] = match.group(1)
442  # need standard format for job number
443  if fire_htcondor: lib.JOBID[i] += ".0"
444  print("jobid is", lib.JOBID[i])
445  else:
446  print('Submission of merge job seems to have failed:',result, end=' ')
447 
448  i +=1
449  # end of while on merge jobs
450 
451 
452 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:28
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:122
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:43
#define str(s)
double split
Definition: MVATrainer.cc:139