CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
18 import os
19 import sys
20 import glob
21 import shutil
22 import cPickle
23 import subprocess
24 import re
25 import argparse
26 
27 def forward_proxy(rundir):
28  """Forward proxy to location visible from the batch system.
29 
30  Arguments:
31  - `rundir`: directory for storing the forwarded proxy
32  """
33 
34  if not mps_tools.check_proxy():
35  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
36  sys.exit(1)
37 
38  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
39  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
40 
41 
42 def write_HTCondor_submit_file(path, script, config, lib):
43  """Writes 'job.submit' file in `path`.
44 
45  Arguments:
46  - `path`: job directory
47  - `script`: script to be executed
48  - `config`: cfg file
49  - `lib`: MPS lib object
50  """
51 
52  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
53  job_flavour = resources[-1]
54 
55  job_submit_template="""\
56 universe = vanilla
57 executable = {script:s}
58 output = {jobm:s}/STDOUT
59 error = {jobm:s}/STDOUT
60 log = {jobm:s}/HTCJOB
61 notification = Always
62 transfer_output_files = ""
63 request_memory = {pedeMem:d}M
64 
65 # adapted to space used on eos for binaries:
66 request_disk = {disk:d}
67 
68 # adapted to threads parameter in pede options and number of available cores
69 request_cpus = {cpus:d}
70 
71 +JobFlavour = "{flavour:s}"
72 """
73  if "bigmem" in resources:
74  job_submit_template += """\
75 +BigMemJob = True
76 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
77 
78 # automatically remove the job if the submitter has no permissions to run a BigMemJob
79 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
80 """
81  job_submit_template += "\nqueue\n"
82 
83  print("Determine number of pede threads...")
84  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
85  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
86  n_threads = 1
87  for option in pede_options:
88  if "threads" in option:
89  n_threads = option.replace("threads", "").strip()
90  n_threads = max(map(lambda x: int(x), n_threads.split()))
91  break
92  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
93  # cores, i.e. we ensure here that the job
94  # would fit core-wise on one machine
95 
96  print("Determine required disk space on remote host...")
97  # determine usage by each file instead of whole directory as this is what
98  # matters for the specified disk usage:
99  spco = subprocess.check_output # to make code below more less verbose
100  opj = os.path.join # dito
101  cmd = ["du", "--apparent-size"]
102  disk_usage = [int(item.split()[0])
103  for directory in ("binaries", "monitors", "tree_files")
104  for item
105  in spco(cmd+
106  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
107  disk_usage = sum(disk_usage)
108  disk_usage *= 1.1 # reserve 10% additional space
109 
110  job_submit_file = os.path.join(Path, "job.submit")
111  with open(job_submit_file, "w") as f:
112  f.write(job_submit_template.format(script = os.path.abspath(script),
113  jobm = os.path.abspath(path),
114  pedeMem = lib.pedeMem,
115  disk = int(disk_usage),
116  cpus = n_threads,
117  flavour = job_flavour))
118 
119  return job_submit_file
120 
121 
122 
123 
124 parser = argparse.ArgumentParser(
125  description="Submit jobs that are setup in local mps database to batch system.",
126 )
127 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
128  help="number of Mille jobs to be submitted (default: %(default)d)")
129 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
130  help = ("job IDs to be submitted; "
131  "use either 'job<ID>' or directly '<ID>'"))
132 parser.add_argument("-a", "--all", dest="allMille", default=False,
133  action="store_true",
134  help = ("submit all setup Mille jobs; "
135  "maxJobs and --job-id are ignored"))
136 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
137  action="store_true",
138  help = ("submit all setup Pede jobs; "
139  "maxJobs is ignored, but --job-id is respected"))
140 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
141  action="store_true",
142  help=("force the submission of the Pede job in case some "+
143  "Mille jobs are not in the OK state"))
144 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
145  action="store_true",
146  help=("force the submission of the Pede job in case some "+
147  "Mille jobs are not in the OK state. Unlike --forceMerge "+
148  "this option assumes the user has edited theScript.sh and "+
149  "alignment_merge.py to consistently pick up only the mille "+
150  "output files that exist"))
151 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
152  action="store_true",
153  help="forward VOMS proxy to batch system")
154 args = parser.parse_args(sys.argv[1:])
155 
156 
157 lib = mpslib.jobdatabase()
158 lib.read_db()
159 
160 if args.allMille:
161  # submit all Mille jobs and ignore 'maxJobs' supplied by user
162  args.maxJobs = lib.nJobs
163  args.job_id = None
164 
165 if args.job_id is None:
166  job_mask = lib.JOBDIR
167 else:
168  job_mask = []
169  for job_id in args.job_id:
170  invalid_id = False
171  if job_id.startswith("job"): job_mask.append(job_id)
172  elif job_id.startswith("m"): job_mask.append("job"+job_id)
173  else:
174  try:
175  job_mask.append(lib.JOBDIR[int(job_id)-1])
176  except ValueError:
177  invalid_id = True
178  except IndexError:
179  print("ID provided to '-j/--job-id' is out of range:", job_id)
180  sys.exit(1)
181 
182  if invalid_id or job_mask[-1] not in lib.JOBDIR:
183  print("ID provided to '-j/--job-id' is invalid:", job_id)
184  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
185  print("of the following formats:")
186  print(" - job042")
187  print(" - 042")
188  print(" - jobm1")
189  print(" - m1")
190  sys.exit(1)
191 
192 # build the absolute job directory path (needed by mps_script)
193 theJobData = os.path.join(os.getcwd(), "jobData")
194 
195 # set the job name ???????????????????
196 theJobName = 'mpalign'
197 if lib.addFiles != '':
198  theJobName = lib.addFiles
199 
200 fire_htcondor = False
201 
202 # fire the 'normal' parallel Jobs (Mille Jobs)
203 if not args.fireMerge:
204  #set the resources string coming from mps.db
205  resources = lib.get_class('mille')
206 
207  # "cmscafspec" found in $resources: special cmscaf resources
208  if 'cmscafspec' in resources:
209  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
210  resources = '-q cmscafalcamille'
211  # "cmscaf" found in $resources
212  elif 'cmscaf' in resources:
213  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
214  resources = '-q'+resources+' -m g_cmscaf'
215  elif "htcondor" in resources:
216  fire_htcondor = True
217  else:
218  resources = '-q '+resources
219 
220  nSub = 0 # number of submitted Jobs
221  for i in xrange(lib.nJobs):
222  if lib.JOBDIR[i] not in job_mask: continue
223  if lib.JOBSTATUS[i] == 'SETUP':
224  if nSub < args.maxJobs:
225  if args.forwardProxy:
226  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
227 
228  # submit a new job with 'bsub -J ...' and check output
229  # for some reasons LSF wants script with full path
230  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
231  (theJobName, resources, theJobData, lib.JOBDIR[i])
232  print(submission)
233  try:
234  result = subprocess.check_output(submission,
235  stderr=subprocess.STDOUT,
236  shell=True)
237  except subprocess.CalledProcessError as e:
238  result = "" # -> check for successful job submission will fail
239  print(' '+result, end=' ')
240  result = result.strip()
241 
242  # check if job was submitted and updating jobdatabase
243  match = re.search('Job <(\d+)> is submitted', result)
244  if match:
245  # need standard format for job number
246  lib.JOBSTATUS[i] = 'SUBTD'
247  lib.JOBID[i] = match.group(1)
248  else:
249  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
250  nSub +=1
251 
252 # fire the merge job
253 else:
254  print('fire merge')
255  # set the resources string coming from mps.db
256  resources = lib.get_class('pede')
257  if 'cmscafspec' in resources:
258  resources = '-q cmscafalcamille'
259  elif "htcondor" in resources:
260  fire_htcondor = True
261  else:
262  resources = '-q '+resources
263 
264  if not fire_htcondor:
265  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
266  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
267 
268  # check whether all other jobs are OK
269  mergeOK = True
270  for i in xrange(lib.nJobs):
271  if lib.JOBSTATUS[i] != 'OK':
272  if 'DISABLED' not in lib.JOBSTATUS[i]:
273  mergeOK = False
274  break
275 
276  # loop over merge jobs
277  i = lib.nJobs
278  while i<len(lib.JOBDIR):
279  jobNumFrom1 = i+1
280  if lib.JOBDIR[i] not in job_mask:
281  i += 1
282  continue
283 
284  # check if current job in SETUP mode or if forced
285  if lib.JOBSTATUS[i] != 'SETUP':
286  print('Merge job %d status %s not submitted.' % \
287  (jobNumFrom1, lib.JOBSTATUS[i]))
288  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
289  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
290  else:
291  # some paths for clarity
292  Path = os.path.join(theJobData,lib.JOBDIR[i])
293  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
294  scriptPath = os.path.join(Path, "theScript.sh")
295 
296  # force option invoked:
297  if args.forceMerge:
298 
299  # make a backup copy of the script first, if it doesn't already exist.
300  if not os.path.isfile(backupScriptPath):
301  os.system('cp -p '+scriptPath+' '+backupScriptPath)
302 
303  # get the name of merge cfg file -> either the.py or alignment_merge.py
304  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
305  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
306  mergeCfg = mergeCfg.strip()
307 
308  if fire_htcondor:
309  job_submit_file = write_HTCondor_submit_file(Path, scriptPath, mergeCfg, lib)
310 
311  # make a backup copy of the cfg
312  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
313  cfgPath = os.path.join(Path, mergeCfg)
314  if not os.path.isfile(backupCfgPath):
315  os.system('cp -p '+cfgPath+' '+backupCfgPath)
316 
317  # retrieve weights configuration
318  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
319  weight_conf = cPickle.load(f)
320 
321  # blank weights
322  mps_tools.run_checked(["mps_weight.pl", "-c"])
323 
324  # apply weights
325  for name,weight in weight_conf:
326  print(" ".join(["mps_weight.pl", "-N", name, weight]))
327  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
328 
329  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
330  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
331  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
332  os.system(command)
333 
334  # rewrite theScript.sh using inly 'OK' jobs
335  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
336  os.system(command)
337 
338  else:
339  # restore the backup copy of the script
340  if os.path.isfile(backupScriptPath):
341  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
342 
343  # get the name of merge cfg file
344  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
345  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
346  command = 'basename '+mergeCfg
347  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
348  mergeCfg = mergeCfg.replace('\n','')
349 
350  if fire_htcondor:
351  job_submit_file = write_HTCondor_submit_file(Path, scriptPath, mergeCfg, lib)
352 
353  # restore the backup copy of the cfg
354  backupCfgPath = Path+'/%s.bak' % mergeCfg
355  cfgPath = Path+'/%s' % mergeCfg
356  if os.path.isfile(backupCfgPath):
357  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
358 
359  # end of if/else forceMerge
360 
361  # submit merge job
362  nMerge = i-lib.nJobs # 'index' of this merge job
363  curJobName = 'm'+str(nMerge)+'_'+theJobName
364  if args.forwardProxy: forward_proxy(Path)
365  if fire_htcondor:
366  submission = ["condor_submit",
367  "-batch-name", curJobName,
368  job_submit_file]
369  else:
370  submission = ["bsub", "-J", curJobName, resources, scriptPath]
371  for _ in xrange(5):
372  try:
373  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
374  break
375  except subprocess.CalledProcessError as e:
376  result = e.output
377 
378  print(' '+result, end=' ')
379  result = result.strip()
380 
381  # check if merge job was submitted and updating jobdatabase
382  if fire_htcondor:
383  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
384  else:
385  match = re.search('Job <(\d+)> is submitted', result)
386  if match:
387  lib.JOBSTATUS[i] = 'SUBTD'
388  lib.JOBID[i] = match.group(1)
389  # need standard format for job number
390  if fire_htcondor: lib.JOBID[i] += ".0"
391  print("jobid is", lib.JOBID[i])
392  else:
393  print('Submission of merge job seems to have failed:',result, end=' ')
394 
395  i +=1
396  # end of while on merge jobs
397 
398 
399 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:27
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
def write_HTCondor_submit_file(path, script, config, lib)
Definition: mps_fire.py:42
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)
double split
Definition: MVATrainer.cc:139