CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
16 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
17 import os
18 import sys
19 import glob
20 import shutil
21 import cPickle
22 import subprocess
23 import re
24 import argparse
25 
26 def forward_proxy(rundir):
27  """Forward proxy to location visible from the batch system.
28 
29  Arguments:
30  - `rundir`: directory for storing the forwarded proxy
31  """
32 
33  if not mps_tools.check_proxy():
34  print "Please create proxy via 'voms-proxy-init -voms cms -rfc'."
35  sys.exit(1)
36 
37  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
38  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
39 
40 
41 def write_HTCondor_submit_file(path, script, config, lib):
42  """Writes 'job.submit' file in `path`.
43 
44  Arguments:
45  - `path`: job directory
46  - `script`: script to be executed
47  - `config`: cfg file
48  - `lib`: MPS lib object
49  """
50 
51  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
52  job_flavour = resources[-1]
53 
54  job_submit_template="""\
55 universe = vanilla
56 executable = {script:s}
57 output = {jobm:s}/STDOUT
58 error = {jobm:s}/STDOUT
59 log = {jobm:s}/HTCJOB
60 notification = Always
61 transfer_output_files = ""
62 request_memory = {pedeMem:d}M
63 
64 # adapted to space used on eos for binaries:
65 request_disk = {disk:d}
66 
67 # adapted to threads parameter in pede options and number of available cores
68 request_cpus = {cpus:d}
69 
70 +JobFlavour = "{flavour:s}"
71 """
72  if "bigmem" in resources:
73  job_submit_template += """\
74 +BigMemJob = True
75 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
76 
77 # automatically remove the job if the submitter has no permissions to run a BigMemJob
78 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
79 """
80  job_submit_template += "\nqueue\n"
81 
82  print "Determine number of pede threads..."
83  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
84  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
85  n_threads = 1
86  for option in pede_options:
87  if "threads" in option:
88  n_threads = option.replace("threads", "").strip()
89  n_threads = max(map(lambda x: int(x), n_threads.split()))
90  break
91  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
92  # cores, i.e. we ensure here that the job
93  # would fit core-wise on one machine
94 
95  print "Determine required disk space on remote host..."
96  # determine usage by each file instead of whole directory as this is what
97  # matters for the specified disk usage:
98  spco = subprocess.check_output # to make code below more less verbose
99  opj = os.path.join # dito
100  cmd = ["du", "--apparent-size"]
101  disk_usage = [int(item.split()[0])
102  for directory in ("binaries", "monitors", "tree_files")
103  for item
104  in spco(cmd+
105  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
106  disk_usage = sum(disk_usage)
107  disk_usage *= 1.1 # reserve 10% additional space
108 
109  job_submit_file = os.path.join(Path, "job.submit")
110  with open(job_submit_file, "w") as f:
111  f.write(job_submit_template.format(script = os.path.abspath(script),
112  jobm = os.path.abspath(path),
113  pedeMem = lib.pedeMem,
114  disk = int(disk_usage),
115  cpus = n_threads,
116  flavour = job_flavour))
117 
118  return job_submit_file
119 
120 
121 
122 
123 parser = argparse.ArgumentParser(
124  description="Submit jobs that are setup in local mps database to batch system.",
125 )
126 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
127  help="number of Mille jobs to be submitted (default: %(default)d)")
128 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
129  help = ("job IDs to be submitted; "
130  "use either 'job<ID>' or directly '<ID>'"))
131 parser.add_argument("-a", "--all", dest="allMille", default=False,
132  action="store_true",
133  help = ("submit all setup Mille jobs; "
134  "maxJobs and --job-id are ignored"))
135 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
136  action="store_true",
137  help = ("submit all setup Pede jobs; "
138  "maxJobs is ignored, but --job-id is respected"))
139 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
140  action="store_true",
141  help=("force the submission of the Pede job in case some "+
142  "Mille jobs are not in the OK state"))
143 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
144  action="store_true",
145  help="forward VOMS proxy to batch system")
146 args = parser.parse_args(sys.argv[1:])
147 
148 
149 lib = mpslib.jobdatabase()
150 lib.read_db()
151 
152 if args.allMille:
153  # submit all Mille jobs and ignore 'maxJobs' supplied by user
154  args.maxJobs = lib.nJobs
155  args.job_id = None
156 
157 if args.job_id is None:
158  job_mask = lib.JOBDIR
159 else:
160  job_mask = []
161  for job_id in args.job_id:
162  invalid_id = False
163  if job_id.startswith("job"): job_mask.append(job_id)
164  elif job_id.startswith("m"): job_mask.append("job"+job_id)
165  else:
166  try:
167  job_mask.append(lib.JOBDIR[int(job_id)-1])
168  except ValueError:
169  invalid_id = True
170  except IndexError:
171  print "ID provided to '-j/--job-id' is out of range:", job_id
172  sys.exit(1)
173 
174  if invalid_id or job_mask[-1] not in lib.JOBDIR:
175  print "ID provided to '-j/--job-id' is invalid:", job_id
176  print "'-j/--job-id' requires the IDs to exist and to be of either",
177  print "of the following formats:"
178  print " - job042"
179  print " - 042"
180  print " - jobm1"
181  print " - m1"
182  sys.exit(1)
183 
184 # build the absolute job directory path (needed by mps_script)
185 theJobData = os.path.join(os.getcwd(), "jobData")
186 
187 # set the job name ???????????????????
188 theJobName = 'mpalign'
189 if lib.addFiles != '':
190  theJobName = lib.addFiles
191 
192 fire_htcondor = False
193 
194 # fire the 'normal' parallel Jobs (Mille Jobs)
195 if not args.fireMerge:
196  #set the resources string coming from mps.db
197  resources = lib.get_class('mille')
198 
199  # "cmscafspec" found in $resources: special cmscaf resources
200  if 'cmscafspec' in resources:
201  print '\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n'
202  resources = '-q cmscafalcamille'
203  # "cmscaf" found in $resources
204  elif 'cmscaf' in resources:
205  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
206  resources = '-q'+resources+' -m g_cmscaf'
207  elif "htcondor" in resources:
208  fire_htcondor = True
209  else:
210  resources = '-q '+resources
211 
212  nSub = 0 # number of submitted Jobs
213  for i in xrange(lib.nJobs):
214  if lib.JOBDIR[i] not in job_mask: continue
215  if lib.JOBSTATUS[i] == 'SETUP':
216  if nSub < args.maxJobs:
217  if args.forwardProxy:
218  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
219 
220  # submit a new job with 'bsub -J ...' and check output
221  # for some reasons LSF wants script with full path
222  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
223  (theJobName, resources, theJobData, lib.JOBDIR[i])
224  print submission
225  try:
226  result = subprocess.check_output(submission,
227  stderr=subprocess.STDOUT,
228  shell=True)
229  except subprocess.CalledProcessError as e:
230  result = "" # -> check for successful job submission will fail
231  print ' '+result,
232  result = result.strip()
233 
234  # check if job was submitted and updating jobdatabase
235  match = re.search('Job <(\d+)> is submitted', result)
236  if match:
237  # need standard format for job number
238  lib.JOBSTATUS[i] = 'SUBTD'
239  lib.JOBID[i] = match.group(1)
240  else:
241  print 'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result),
242  nSub +=1
243 
244 # fire the merge job
245 else:
246  print 'fire merge'
247  # set the resources string coming from mps.db
248  resources = lib.get_class('pede')
249  if 'cmscafspec' in resources:
250  resources = '-q cmscafalcamille'
251  elif "htcondor" in resources:
252  fire_htcondor = True
253  else:
254  resources = '-q '+resources
255 
256  if not fire_htcondor:
257  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
258  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
259 
260  # check whether all other jobs are OK
261  mergeOK = True
262  for i in xrange(lib.nJobs):
263  if lib.JOBSTATUS[i] != 'OK':
264  if 'DISABLED' not in lib.JOBSTATUS[i]:
265  mergeOK = False
266  break
267 
268  # loop over merge jobs
269  i = lib.nJobs
270  while i<len(lib.JOBDIR):
271  jobNumFrom1 = i+1
272  if lib.JOBDIR[i] not in job_mask:
273  i += 1
274  continue
275 
276  # check if current job in SETUP mode or if forced
277  if lib.JOBSTATUS[i] != 'SETUP':
278  print 'Merge job %d status %s not submitted.' % \
279  (jobNumFrom1, lib.JOBSTATUS[i])
280  elif not (mergeOK or args.forceMerge):
281  print 'Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).'
282  else:
283  # some paths for clarity
284  Path = os.path.join(theJobData,lib.JOBDIR[i])
285  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
286  scriptPath = os.path.join(Path, "theScript.sh")
287 
288  # force option invoked:
289  if args.forceMerge:
290 
291  # make a backup copy of the script first, if it doesn't already exist.
292  if not os.path.isfile(backupScriptPath):
293  os.system('cp -p '+scriptPath+' '+backupScriptPath)
294 
295  # get the name of merge cfg file -> either the.py or alignment_merge.py
296  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
297  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
298  mergeCfg = mergeCfg.strip()
299 
300  if fire_htcondor:
301  job_submit_file = write_HTCondor_submit_file(Path, scriptPath, mergeCfg, lib)
302 
303  # make a backup copy of the cfg
304  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
305  cfgPath = os.path.join(Path, mergeCfg)
306  if not os.path.isfile(backupCfgPath):
307  os.system('cp -p '+cfgPath+' '+backupCfgPath)
308 
309  # retrieve weights configuration
310  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
311  weight_conf = cPickle.load(f)
312 
313  # blank weights
314  mps_tools.run_checked(["mps_weight.pl", "-c"])
315 
316  # apply weights
317  for name,weight in weight_conf:
318  print " ".join(["mps_weight.pl", "-N", name, weight])
319  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
320 
321  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
322  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
323  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
324  os.system(command)
325 
326  # rewrite theScript.sh using inly 'OK' jobs
327  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
328  os.system(command)
329 
330  else:
331  # restore the backup copy of the script
332  if os.path.isfile(backupScriptPath):
333  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
334 
335  # get the name of merge cfg file
336  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
337  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
338  command = 'basename '+mergeCfg
339  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
340  mergeCfg = mergeCfg.replace('\n','')
341 
342  if fire_htcondor:
343  job_submit_file = write_HTCondor_submit_file(Path, scriptPath, mergeCfg, lib)
344 
345  # restore the backup copy of the cfg
346  backupCfgPath = Path+'/%s.bak' % mergeCfg
347  cfgPath = Path+'/%s' % mergeCfg
348  if os.path.isfile(backupCfgPath):
349  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
350 
351  # end of if/else forceMerge
352 
353  # submit merge job
354  nMerge = i-lib.nJobs # 'index' of this merge job
355  curJobName = 'm'+str(nMerge)+'_'+theJobName
356  if args.forwardProxy: forward_proxy(Path)
357  if fire_htcondor:
358  submission = ["condor_submit",
359  "-batch-name", curJobName,
360  job_submit_file]
361  else:
362  submission = ["bsub", "-J", curJobName, resources, scriptPath]
363  for _ in xrange(5):
364  try:
365  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
366  break
367  except subprocess.CalledProcessError as e:
368  result = e.output
369 
370  print ' '+result,
371  result = result.strip()
372 
373  # check if merge job was submitted and updating jobdatabase
374  if fire_htcondor:
375  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
376  else:
377  match = re.search('Job <(\d+)> is submitted', result)
378  if match:
379  lib.JOBSTATUS[i] = 'SUBTD'
380  lib.JOBID[i] = match.group(1)
381  # need standard format for job number
382  if fire_htcondor: lib.JOBID[i] += ".0"
383  print "jobid is", lib.JOBID[i]
384  else:
385  print 'Submission of merge job seems to have failed:',result,
386 
387  i +=1
388  # end of while on merge jobs
389 
390 
391 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:26
def write_HTCondor_submit_file(path, script, config, lib)
Definition: mps_fire.py:41
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)
double split
Definition: MVATrainer.cc:139