CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 from builtins import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
19 import os
20 import sys
21 import glob
22 import shutil
23 if sys.version_info[0]>2:
24  import _pickle as cPickle
25 else:
26  import cPickle
27 import subprocess
28 import re
29 import argparse
30 
31 def forward_proxy(rundir):
32  """Forward proxy to location visible from the batch system.
33 
34  Arguments:
35  - `rundir`: directory for storing the forwarded proxy
36  """
37 
38  if not mps_tools.check_proxy():
39  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
40  sys.exit(1)
41 
42  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
43  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
44 
45 
46 def write_HTCondor_submit_file_pede(path, script, config, lib):
47  """Writes 'job.submit' file in `path`.
48 
49  Arguments:
50  - `path`: job directory
51  - `script`: script to be executed
52  - `config`: cfg file
53  - `lib`: MPS lib object
54  """
55 
56  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
57  job_flavour = resources[-1]
58 
59  job_submit_template="""\
60 universe = vanilla
61 executable = {script:s}
62 output = {jobm:s}/STDOUT
63 error = {jobm:s}/STDOUT
64 log = {jobm:s}/HTCJOB
65 notification = Always
66 transfer_output_files = ""
67 request_memory = {pedeMem:d}M
68 
69 # adapted to space used on eos for binaries:
70 request_disk = {disk:d}
71 
72 # adapted to threads parameter in pede options and number of available cores
73 request_cpus = {cpus:d}
74 
75 +JobFlavour = "{flavour:s}"
76 """
77  if "bigmem" in resources:
78  job_submit_template += """\
79 +BigMemJob = True
80 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
81 
82 # automatically remove the job if the submitter has no permissions to run a BigMemJob
83 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
84 """
85  job_submit_template += "\nqueue\n"
86 
87  print("Determine number of pede threads...")
88  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
89  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
90  n_threads = 1
91  for option in pede_options:
92  if "threads" in option:
93  n_threads = option.replace("threads", "").strip()
94  n_threads = max(map(lambda x: int(x), n_threads.split()))
95  break
96  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
97  # cores, i.e. we ensure here that the job
98  # would fit core-wise on one machine
99 
100  print("Determine required disk space on remote host...")
101  # determine usage by each file instead of whole directory as this is what
102  # matters for the specified disk usage:
103  spco = subprocess.check_output # to make code below more less verbose
104  opj = os.path.join # dito
105  cmd = ["du", "--apparent-size"]
106  disk_usage = [int(item.split()[0])
107  for directory in ("binaries", "monitors", "tree_files")
108  for item
109  in spco(cmd+
110  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
111  disk_usage = sum(disk_usage)
112  disk_usage *= 1.1 # reserve 10% additional space
113 
114  job_submit_file = os.path.join(Path, "job.submit")
115  with open(job_submit_file, "w") as f:
116  f.write(job_submit_template.format(script = os.path.abspath(script),
117  jobm = os.path.abspath(path),
118  pedeMem = lib.pedeMem,
119  disk = int(disk_usage),
120  cpus = n_threads,
121  flavour = job_flavour))
122 
123  return job_submit_file
124 
125 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
126  """Writes 'job.submit' file in `path`.
127 
128  Arguments:
129  - `path`: job directory
130  - `script`: script to be executed
131  - `lib`: MPS lib object
132  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
133  """
134 
135  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
136  job_flavour = resources[-1]
137 
138  job_submit_template="""\
139 universe = vanilla
140 executable = {script:s}
141 output = {jobm:s}/STDOUT
142 error = {jobm:s}/STDOUT
143 log = {jobm:s}/HTCJOB
144 notification = Always
145 transfer_output_files = ""
146 
147 +JobFlavour = "{flavour:s}"
148 """
149  if "cafalca" in resources:
150  job_submit_template += """\
151 +CAFJob = True
152 +AccountingGroup = "group_u_CMS.CAF.ALCA"
153 # automatically remove the job if the submitter has no permissions to run a CAF Job
154 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True
155 """
156 
157  if proxy_path is not None:
158  job_submit_template += """\
159 +x509userproxy = "{proxy:s}"
160 """
161  job_submit_template += "\nqueue\n"
162 
163  job_submit_file = os.path.join(Path, "job.submit")
164  with open(job_submit_file, "w") as f:
165  f.write(job_submit_template.format(script = os.path.abspath(script),
166  jobm = os.path.abspath(path),
167  flavour = job_flavour,
168  proxy = proxy_path))
169 
170  return job_submit_file
171 
172 
173 
174 parser = argparse.ArgumentParser(
175  description="Submit jobs that are setup in local mps database to batch system.",
176 )
177 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
178  help="number of Mille jobs to be submitted (default: %(default)d)")
179 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
180  help = ("job IDs to be submitted; "
181  "use either 'job<ID>' or directly '<ID>'"))
182 parser.add_argument("-a", "--all", dest="allMille", default=False,
183  action="store_true",
184  help = ("submit all setup Mille jobs; "
185  "maxJobs and --job-id are ignored"))
186 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
187  action="store_true",
188  help = ("submit all setup Pede jobs; "
189  "maxJobs is ignored, but --job-id is respected"))
190 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
191  action="store_true",
192  help=("force the submission of the Pede job in case some "+
193  "Mille jobs are not in the OK state"))
194 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
195  action="store_true",
196  help=("force the submission of the Pede job in case some "+
197  "Mille jobs are not in the OK state. Unlike --forceMerge "+
198  "this option assumes the user has edited theScript.sh and "+
199  "alignment_merge.py to consistently pick up only the mille "+
200  "output files that exist"))
201 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
202  action="store_true",
203  help="forward VOMS proxy to batch system")
204 args = parser.parse_args(sys.argv[1:])
205 
206 
207 lib = mpslib.jobdatabase()
208 lib.read_db()
209 
210 if args.allMille:
211  # submit all Mille jobs and ignore 'maxJobs' supplied by user
212  args.maxJobs = lib.nJobs
213  args.job_id = None
214 
215 if args.job_id is None:
216  job_mask = lib.JOBDIR
217 else:
218  job_mask = []
219  for job_id in args.job_id:
220  invalid_id = False
221  if job_id.startswith("job"): job_mask.append(job_id)
222  elif job_id.startswith("m"): job_mask.append("job"+job_id)
223  else:
224  try:
225  job_mask.append(lib.JOBDIR[int(job_id)-1])
226  except ValueError:
227  invalid_id = True
228  except IndexError:
229  print("ID provided to '-j/--job-id' is out of range:", job_id)
230  sys.exit(1)
231 
232  if invalid_id or job_mask[-1] not in lib.JOBDIR:
233  print("ID provided to '-j/--job-id' is invalid:", job_id)
234  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
235  print("of the following formats:")
236  print(" - job042")
237  print(" - 042")
238  print(" - jobm1")
239  print(" - m1")
240  sys.exit(1)
241 
242 # build the absolute job directory path (needed by mps_script)
243 theJobData = os.path.join(os.getcwd(), "jobData")
244 
245 # set the job name ???????????????????
246 theJobName = 'mpalign'
247 if lib.addFiles != '':
248  theJobName = lib.addFiles
249 
250 fire_htcondor = False
251 
252 # fire the 'normal' parallel Jobs (Mille Jobs)
253 if not args.fireMerge:
254  #set the resources string coming from mps.db
255  resources = lib.get_class('mille')
256 
257  # "cmscafspec" found in $resources: special cmscaf resources
258  if 'cmscafspec' in resources:
259  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
260  resources = '-q cmscafalcamille'
261  # "cmscaf" found in $resources
262  elif 'cmscaf' in resources:
263  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
264  resources = '-q'+resources+' -m g_cmscaf'
265  elif "htcondor" in resources:
266  fire_htcondor = True
267  else:
268  resources = '-q '+resources
269 
270  nSub = 0 # number of submitted Jobs
271  for i in range(lib.nJobs):
272  if lib.JOBDIR[i] not in job_mask: continue
273  if lib.JOBSTATUS[i] == 'SETUP':
274  if nSub < args.maxJobs:
275  if args.forwardProxy:
276  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
277 
278  # submit a new job with 'bsub -J ...' and check output
279  # for some reasons LSF wants script with full path
280  if fire_htcondor:
281  Path = os.path.join(theJobData,lib.JOBDIR[i])
282  scriptPath = os.path.join(Path, "theScript.sh")
283  if args.forwardProxy:
284  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
285  else:
286  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
287  submission = "condor_submit -batch-name %s %s"%\
288  (theJobName, job_submit_file)
289  else:
290  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
291  (theJobName, resources, theJobData, lib.JOBDIR[i])
292  print(submission)
293  try:
294  result = subprocess.check_output(submission,
295  stderr=subprocess.STDOUT,
296  shell=True)
297  except subprocess.CalledProcessError as e:
298  result = "" # -> check for successful job submission will fail
299  print(' '+result, end=' ')
300  result = result.strip()
301 
302  # check if job was submitted and updating jobdatabase
303  if fire_htcondor:
304  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
305  else:
306  match = re.search('Job <(\d+)> is submitted', result)
307  if match:
308  # need standard format for job number
309  lib.JOBSTATUS[i] = 'SUBTD'
310  lib.JOBID[i] = match.group(1)
311  if fire_htcondor: lib.JOBID[i] += ".0"
312  else:
313  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
314  nSub +=1
315 
316 # fire the merge job
317 else:
318  print('fire merge')
319  # set the resources string coming from mps.db
320  resources = lib.get_class('pede')
321  if 'cmscafspec' in resources:
322  resources = '-q cmscafalcamille'
323  elif "htcondor" in resources:
324  fire_htcondor = True
325  else:
326  resources = '-q '+resources
327 
328  if not fire_htcondor:
329  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
330  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
331 
332  # check whether all other jobs are OK
333  mergeOK = True
334  for i in range(lib.nJobs):
335  if lib.JOBSTATUS[i] != 'OK':
336  if 'DISABLED' not in lib.JOBSTATUS[i]:
337  mergeOK = False
338  break
339 
340  # loop over merge jobs
341  i = lib.nJobs
342  while i<len(lib.JOBDIR):
343  jobNumFrom1 = i+1
344  if lib.JOBDIR[i] not in job_mask:
345  i += 1
346  continue
347 
348  # check if current job in SETUP mode or if forced
349  if lib.JOBSTATUS[i] != 'SETUP':
350  print('Merge job %d status %s not submitted.' % \
351  (jobNumFrom1, lib.JOBSTATUS[i]))
352  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
353  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
354  else:
355  # some paths for clarity
356  Path = os.path.join(theJobData,lib.JOBDIR[i])
357  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
358  scriptPath = os.path.join(Path, "theScript.sh")
359 
360  # force option invoked:
361  if args.forceMerge:
362 
363  # make a backup copy of the script first, if it doesn't already exist.
364  if not os.path.isfile(backupScriptPath):
365  os.system('cp -p '+scriptPath+' '+backupScriptPath)
366 
367  # get the name of merge cfg file -> either the.py or alignment_merge.py
368  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
369  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
370  mergeCfg = mergeCfg.strip()
371 
372  if fire_htcondor:
373  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
374 
375  # make a backup copy of the cfg
376  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
377  cfgPath = os.path.join(Path, mergeCfg)
378  if not os.path.isfile(backupCfgPath):
379  os.system('cp -p '+cfgPath+' '+backupCfgPath)
380 
381  # retrieve weights configuration
382  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
383  weight_conf = cPickle.load(f)
384 
385  # blank weights
386  mps_tools.run_checked(["mps_weight.pl", "-c"])
387 
388  # apply weights
389  for name,weight in weight_conf:
390  print(" ".join(["mps_weight.pl", "-N", name, weight]))
391  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
392 
393  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
394  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
395  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
396  os.system(command)
397 
398  # rewrite theScript.sh using inly 'OK' jobs
399  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
400  os.system(command)
401 
402  else:
403  # restore the backup copy of the script
404  if os.path.isfile(backupScriptPath):
405  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
406 
407  # get the name of merge cfg file
408  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
409  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
410  command = 'basename '+mergeCfg
411  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
412  mergeCfg = mergeCfg.replace('\n','')
413 
414  if fire_htcondor:
415  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
416 
417  # restore the backup copy of the cfg
418  backupCfgPath = Path+'/%s.bak' % mergeCfg
419  cfgPath = Path+'/%s' % mergeCfg
420  if os.path.isfile(backupCfgPath):
421  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
422 
423  # end of if/else forceMerge
424 
425  # submit merge job
426  nMerge = i-lib.nJobs # 'index' of this merge job
427  curJobName = 'm'+str(nMerge)+'_'+theJobName
428  if args.forwardProxy: forward_proxy(Path)
429  if fire_htcondor:
430  submission = ["condor_submit",
431  "-batch-name", curJobName,
432  job_submit_file]
433  else:
434  submission = ["bsub", "-J", curJobName, resources, scriptPath]
435  for _ in range(5):
436  try:
437  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
438  break
439  except subprocess.CalledProcessError as e:
440  result = e.output
441 
442  print(' '+result, end=' ')
443  result = result.strip()
444 
445  # check if merge job was submitted and updating jobdatabase
446  if fire_htcondor:
447  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
448  else:
449  match = re.search('Job <(\d+)> is submitted', result)
450  if match:
451  lib.JOBSTATUS[i] = 'SUBTD'
452  lib.JOBID[i] = match.group(1)
453  # need standard format for job number
454  if fire_htcondor: lib.JOBID[i] += ".0"
455  print("jobid is", lib.JOBID[i])
456  else:
457  print('Submission of merge job seems to have failed:',result, end=' ')
458 
459  i +=1
460  # end of while on merge jobs
461 
462 
463 lib.write_db()
std::vector< std::string_view > split(std::string_view, const char *)
def forward_proxy(rundir)
Definition: mps_fire.py:31
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:125
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:46
#define str(s)