CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
18 import os
19 import sys
20 import glob
21 import shutil
22 import cPickle
23 import subprocess
24 import re
25 import argparse
26 
27 def forward_proxy(rundir):
28  """Forward proxy to location visible from the batch system.
29 
30  Arguments:
31  - `rundir`: directory for storing the forwarded proxy
32  """
33 
34  if not mps_tools.check_proxy():
35  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
36  sys.exit(1)
37 
38  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
39  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
40 
41 
42 def write_HTCondor_submit_file_pede(path, script, config, lib):
43  """Writes 'job.submit' file in `path`.
44 
45  Arguments:
46  - `path`: job directory
47  - `script`: script to be executed
48  - `config`: cfg file
49  - `lib`: MPS lib object
50  """
51 
52  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
53  job_flavour = resources[-1]
54 
55  job_submit_template="""\
56 universe = vanilla
57 executable = {script:s}
58 output = {jobm:s}/STDOUT
59 error = {jobm:s}/STDOUT
60 log = {jobm:s}/HTCJOB
61 notification = Always
62 transfer_output_files = ""
63 request_memory = {pedeMem:d}M
64 
65 # adapted to space used on eos for binaries:
66 request_disk = {disk:d}
67 
68 # adapted to threads parameter in pede options and number of available cores
69 request_cpus = {cpus:d}
70 
71 +JobFlavour = "{flavour:s}"
72 """
73  if "bigmem" in resources:
74  job_submit_template += """\
75 +BigMemJob = True
76 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
77 
78 # automatically remove the job if the submitter has no permissions to run a BigMemJob
79 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
80 """
81  job_submit_template += "\nqueue\n"
82 
83  print("Determine number of pede threads...")
84  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
85  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
86  n_threads = 1
87  for option in pede_options:
88  if "threads" in option:
89  n_threads = option.replace("threads", "").strip()
90  n_threads = max(map(lambda x: int(x), n_threads.split()))
91  break
92  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
93  # cores, i.e. we ensure here that the job
94  # would fit core-wise on one machine
95 
96  print("Determine required disk space on remote host...")
97  # determine usage by each file instead of whole directory as this is what
98  # matters for the specified disk usage:
99  spco = subprocess.check_output # to make code below more less verbose
100  opj = os.path.join # dito
101  cmd = ["du", "--apparent-size"]
102  disk_usage = [int(item.split()[0])
103  for directory in ("binaries", "monitors", "tree_files")
104  for item
105  in spco(cmd+
106  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
107  disk_usage = sum(disk_usage)
108  disk_usage *= 1.1 # reserve 10% additional space
109 
110  job_submit_file = os.path.join(Path, "job.submit")
111  with open(job_submit_file, "w") as f:
112  f.write(job_submit_template.format(script = os.path.abspath(script),
113  jobm = os.path.abspath(path),
114  pedeMem = lib.pedeMem,
115  disk = int(disk_usage),
116  cpus = n_threads,
117  flavour = job_flavour))
118 
119  return job_submit_file
120 
121 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
122  """Writes 'job.submit' file in `path`.
123 
124  Arguments:
125  - `path`: job directory
126  - `script`: script to be executed
127  - `lib`: MPS lib object
128  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
129  """
130 
131  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
132  job_flavour = resources[-1]
133 
134  job_submit_template="""\
135 universe = vanilla
136 executable = {script:s}
137 output = {jobm:s}/STDOUT
138 error = {jobm:s}/STDOUT
139 log = {jobm:s}/HTCJOB
140 notification = Always
141 transfer_output_files = ""
142 
143 +JobFlavour = "{flavour:s}"
144 """
145  if proxy_path is not None:
146  job_submit_template += """\
147 +x509userproxy = "{proxy:s}"
148 """
149  job_submit_template += "\nqueue\n"
150 
151  job_submit_file = os.path.join(Path, "job.submit")
152  with open(job_submit_file, "w") as f:
153  f.write(job_submit_template.format(script = os.path.abspath(script),
154  jobm = os.path.abspath(path),
155  flavour = job_flavour,
156  proxy = proxy_path))
157 
158  return job_submit_file
159 
160 
161 
162 parser = argparse.ArgumentParser(
163  description="Submit jobs that are setup in local mps database to batch system.",
164 )
165 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
166  help="number of Mille jobs to be submitted (default: %(default)d)")
167 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
168  help = ("job IDs to be submitted; "
169  "use either 'job<ID>' or directly '<ID>'"))
170 parser.add_argument("-a", "--all", dest="allMille", default=False,
171  action="store_true",
172  help = ("submit all setup Mille jobs; "
173  "maxJobs and --job-id are ignored"))
174 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
175  action="store_true",
176  help = ("submit all setup Pede jobs; "
177  "maxJobs is ignored, but --job-id is respected"))
178 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
179  action="store_true",
180  help=("force the submission of the Pede job in case some "+
181  "Mille jobs are not in the OK state"))
182 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
183  action="store_true",
184  help=("force the submission of the Pede job in case some "+
185  "Mille jobs are not in the OK state. Unlike --forceMerge "+
186  "this option assumes the user has edited theScript.sh and "+
187  "alignment_merge.py to consistently pick up only the mille "+
188  "output files that exist"))
189 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
190  action="store_true",
191  help="forward VOMS proxy to batch system")
192 args = parser.parse_args(sys.argv[1:])
193 
194 
195 lib = mpslib.jobdatabase()
196 lib.read_db()
197 
198 if args.allMille:
199  # submit all Mille jobs and ignore 'maxJobs' supplied by user
200  args.maxJobs = lib.nJobs
201  args.job_id = None
202 
203 if args.job_id is None:
204  job_mask = lib.JOBDIR
205 else:
206  job_mask = []
207  for job_id in args.job_id:
208  invalid_id = False
209  if job_id.startswith("job"): job_mask.append(job_id)
210  elif job_id.startswith("m"): job_mask.append("job"+job_id)
211  else:
212  try:
213  job_mask.append(lib.JOBDIR[int(job_id)-1])
214  except ValueError:
215  invalid_id = True
216  except IndexError:
217  print("ID provided to '-j/--job-id' is out of range:", job_id)
218  sys.exit(1)
219 
220  if invalid_id or job_mask[-1] not in lib.JOBDIR:
221  print("ID provided to '-j/--job-id' is invalid:", job_id)
222  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
223  print("of the following formats:")
224  print(" - job042")
225  print(" - 042")
226  print(" - jobm1")
227  print(" - m1")
228  sys.exit(1)
229 
230 # build the absolute job directory path (needed by mps_script)
231 theJobData = os.path.join(os.getcwd(), "jobData")
232 
233 # set the job name ???????????????????
234 theJobName = 'mpalign'
235 if lib.addFiles != '':
236  theJobName = lib.addFiles
237 
238 fire_htcondor = False
239 
240 # fire the 'normal' parallel Jobs (Mille Jobs)
241 if not args.fireMerge:
242  #set the resources string coming from mps.db
243  resources = lib.get_class('mille')
244 
245  # "cmscafspec" found in $resources: special cmscaf resources
246  if 'cmscafspec' in resources:
247  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
248  resources = '-q cmscafalcamille'
249  # "cmscaf" found in $resources
250  elif 'cmscaf' in resources:
251  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
252  resources = '-q'+resources+' -m g_cmscaf'
253  elif "htcondor" in resources:
254  fire_htcondor = True
255  else:
256  resources = '-q '+resources
257 
258  nSub = 0 # number of submitted Jobs
259  for i in xrange(lib.nJobs):
260  if lib.JOBDIR[i] not in job_mask: continue
261  if lib.JOBSTATUS[i] == 'SETUP':
262  if nSub < args.maxJobs:
263  if args.forwardProxy:
264  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
265 
266  # submit a new job with 'bsub -J ...' and check output
267  # for some reasons LSF wants script with full path
268  if fire_htcondor:
269  Path = os.path.join(theJobData,lib.JOBDIR[i])
270  scriptPath = os.path.join(Path, "theScript.sh")
271  if args.forwardProxy:
272  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
273  else:
274  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
275  submission = "condor_submit -batch-name %s %s"%\
276  (theJobName, job_submit_file)
277  else:
278  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
279  (theJobName, resources, theJobData, lib.JOBDIR[i])
280  print(submission)
281  try:
282  result = subprocess.check_output(submission,
283  stderr=subprocess.STDOUT,
284  shell=True)
285  except subprocess.CalledProcessError as e:
286  result = "" # -> check for successful job submission will fail
287  print(' '+result, end=' ')
288  result = result.strip()
289 
290  # check if job was submitted and updating jobdatabase
291  if fire_htcondor:
292  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
293  else:
294  match = re.search('Job <(\d+)> is submitted', result)
295  if match:
296  # need standard format for job number
297  lib.JOBSTATUS[i] = 'SUBTD'
298  lib.JOBID[i] = match.group(1)
299  if fire_htcondor: lib.JOBID[i] += ".0"
300  else:
301  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
302  nSub +=1
303 
304 # fire the merge job
305 else:
306  print('fire merge')
307  # set the resources string coming from mps.db
308  resources = lib.get_class('pede')
309  if 'cmscafspec' in resources:
310  resources = '-q cmscafalcamille'
311  elif "htcondor" in resources:
312  fire_htcondor = True
313  else:
314  resources = '-q '+resources
315 
316  if not fire_htcondor:
317  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
318  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
319 
320  # check whether all other jobs are OK
321  mergeOK = True
322  for i in xrange(lib.nJobs):
323  if lib.JOBSTATUS[i] != 'OK':
324  if 'DISABLED' not in lib.JOBSTATUS[i]:
325  mergeOK = False
326  break
327 
328  # loop over merge jobs
329  i = lib.nJobs
330  while i<len(lib.JOBDIR):
331  jobNumFrom1 = i+1
332  if lib.JOBDIR[i] not in job_mask:
333  i += 1
334  continue
335 
336  # check if current job in SETUP mode or if forced
337  if lib.JOBSTATUS[i] != 'SETUP':
338  print('Merge job %d status %s not submitted.' % \
339  (jobNumFrom1, lib.JOBSTATUS[i]))
340  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
341  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
342  else:
343  # some paths for clarity
344  Path = os.path.join(theJobData,lib.JOBDIR[i])
345  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
346  scriptPath = os.path.join(Path, "theScript.sh")
347 
348  # force option invoked:
349  if args.forceMerge:
350 
351  # make a backup copy of the script first, if it doesn't already exist.
352  if not os.path.isfile(backupScriptPath):
353  os.system('cp -p '+scriptPath+' '+backupScriptPath)
354 
355  # get the name of merge cfg file -> either the.py or alignment_merge.py
356  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
357  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
358  mergeCfg = mergeCfg.strip()
359 
360  if fire_htcondor:
361  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
362 
363  # make a backup copy of the cfg
364  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
365  cfgPath = os.path.join(Path, mergeCfg)
366  if not os.path.isfile(backupCfgPath):
367  os.system('cp -p '+cfgPath+' '+backupCfgPath)
368 
369  # retrieve weights configuration
370  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
371  weight_conf = cPickle.load(f)
372 
373  # blank weights
374  mps_tools.run_checked(["mps_weight.pl", "-c"])
375 
376  # apply weights
377  for name,weight in weight_conf:
378  print(" ".join(["mps_weight.pl", "-N", name, weight]))
379  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
380 
381  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
382  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
383  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
384  os.system(command)
385 
386  # rewrite theScript.sh using inly 'OK' jobs
387  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
388  os.system(command)
389 
390  else:
391  # restore the backup copy of the script
392  if os.path.isfile(backupScriptPath):
393  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
394 
395  # get the name of merge cfg file
396  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
397  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
398  command = 'basename '+mergeCfg
399  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
400  mergeCfg = mergeCfg.replace('\n','')
401 
402  if fire_htcondor:
403  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
404 
405  # restore the backup copy of the cfg
406  backupCfgPath = Path+'/%s.bak' % mergeCfg
407  cfgPath = Path+'/%s' % mergeCfg
408  if os.path.isfile(backupCfgPath):
409  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
410 
411  # end of if/else forceMerge
412 
413  # submit merge job
414  nMerge = i-lib.nJobs # 'index' of this merge job
415  curJobName = 'm'+str(nMerge)+'_'+theJobName
416  if args.forwardProxy: forward_proxy(Path)
417  if fire_htcondor:
418  submission = ["condor_submit",
419  "-batch-name", curJobName,
420  job_submit_file]
421  else:
422  submission = ["bsub", "-J", curJobName, resources, scriptPath]
423  for _ in xrange(5):
424  try:
425  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
426  break
427  except subprocess.CalledProcessError as e:
428  result = e.output
429 
430  print(' '+result, end=' ')
431  result = result.strip()
432 
433  # check if merge job was submitted and updating jobdatabase
434  if fire_htcondor:
435  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
436  else:
437  match = re.search('Job <(\d+)> is submitted', result)
438  if match:
439  lib.JOBSTATUS[i] = 'SUBTD'
440  lib.JOBID[i] = match.group(1)
441  # need standard format for job number
442  if fire_htcondor: lib.JOBID[i] += ".0"
443  print("jobid is", lib.JOBID[i])
444  else:
445  print('Submission of merge job seems to have failed:',result, end=' ')
446 
447  i +=1
448  # end of while on merge jobs
449 
450 
451 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:27
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:121
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:42
#define str(s)
double split
Definition: MVATrainer.cc:139