CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 from builtins import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
19 import os
20 import sys
21 import glob
22 import shutil
23 if sys.version_info[0]>2:
24  import _pickle as cPickle
25 else:
26  import cPickle
27 import subprocess
28 import re
29 import argparse
30 
31 def forward_proxy(rundir):
32  """Forward proxy to location visible from the batch system.
33 
34  Arguments:
35  - `rundir`: directory for storing the forwarded proxy
36  """
37 
38  if not mps_tools.check_proxy():
39  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
40  sys.exit(1)
41 
42  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
43  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
44 
45 
46 def write_HTCondor_submit_file_pede(path, script, config, lib):
47  """Writes 'job.submit' file in `path`.
48 
49  Arguments:
50  - `path`: job directory
51  - `script`: script to be executed
52  - `config`: cfg file
53  - `lib`: MPS lib object
54  """
55 
56  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
57  job_flavour = resources[-1]
58 
59  job_submit_template="""\
60 universe = vanilla
61 executable = {script:s}
62 output = {jobm:s}/STDOUT
63 error = {jobm:s}/STDOUT
64 log = {jobm:s}/HTCJOB
65 notification = Always
66 transfer_output_files = ""
67 request_memory = {pedeMem:d}M
68 
69 # adapted to space used on eos for binaries:
70 request_disk = {disk:d}
71 
72 # adapted to threads parameter in pede options and number of available cores
73 request_cpus = {cpus:d}
74 
75 +JobFlavour = "{flavour:s}"
76 """
77  if "bigmem" in resources:
78  job_submit_template += """\
79 +BigMemJob = True
80 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
81 
82 # automatically remove the job if the submitter has no permissions to run a BigMemJob
83 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
84 """
85  job_submit_template += "\nqueue\n"
86 
87  print("Determine number of pede threads...")
88  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
89  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
90  n_threads = 1
91  for option in pede_options:
92  if "threads" in option:
93  n_threads = option.replace("threads", "").strip()
94  n_threads = max(map(lambda x: int(x), n_threads.split()))
95  break
96  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
97  # cores, i.e. we ensure here that the job
98  # would fit core-wise on one machine
99 
100  print("Determine required disk space on remote host...")
101  # determine usage by each file instead of whole directory as this is what
102  # matters for the specified disk usage:
103  spco = subprocess.check_output # to make code below more less verbose
104  opj = os.path.join # dito
105  cmd = ["du", "--apparent-size"]
106  disk_usage = [int(item.split()[0])
107  for directory in ("binaries", "monitors", "tree_files")
108  for item
109  in spco(cmd+
110  glob.glob(opj(lib.mssDir, directory, "*"))).splitlines()]
111  disk_usage = sum(disk_usage)
112  disk_usage *= 1.1 # reserve 10% additional space
113 
114  job_submit_file = os.path.join(Path, "job.submit")
115  with open(job_submit_file, "w") as f:
116  f.write(job_submit_template.format(script = os.path.abspath(script),
117  jobm = os.path.abspath(path),
118  pedeMem = lib.pedeMem,
119  disk = int(disk_usage),
120  cpus = n_threads,
121  flavour = job_flavour))
122 
123  return job_submit_file
124 
125 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
126  """Writes 'job.submit' file in `path`.
127 
128  Arguments:
129  - `path`: job directory
130  - `script`: script to be executed
131  - `lib`: MPS lib object
132  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
133  """
134 
135  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
136  job_flavour = resources[-1]
137 
138  job_submit_template="""\
139 universe = vanilla
140 executable = {script:s}
141 output = {jobm:s}/STDOUT
142 error = {jobm:s}/STDOUT
143 log = {jobm:s}/HTCJOB
144 notification = Always
145 transfer_output_files = ""
146 
147 +JobFlavour = "{flavour:s}"
148 """
149  if "cafalca" in resources:
150  job_submit_template += """\
151 +CAFJob = True
152 +AccountingGroup = "group_u_CMS.CAF.ALCA"
153 # automatically remove the job if the submitter has no permissions to run a CAF Job
154 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True
155 """
156 
157  if proxy_path is not None:
158  job_submit_template += """\
159 +x509userproxy = "{proxy:s}"
160 """
161  job_submit_template += "\nqueue\n"
162 
163  job_submit_file = os.path.join(Path, "job.submit")
164  with open(job_submit_file, "w") as f:
165  f.write(job_submit_template.format(script = os.path.abspath(script),
166  jobm = os.path.abspath(path),
167  flavour = job_flavour,
168  proxy = proxy_path))
169 
170  return job_submit_file
171 
172 
173 
174 parser = argparse.ArgumentParser(
175  description="Submit jobs that are setup in local mps database to batch system.",
176 )
177 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
178  help="number of Mille jobs to be submitted (default: %(default)d)")
179 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
180  help = ("job IDs to be submitted; "
181  "use either 'job<ID>' or directly '<ID>'"))
182 parser.add_argument("-a", "--all", dest="allMille", default=False,
183  action="store_true",
184  help = ("submit all setup Mille jobs; "
185  "maxJobs and --job-id are ignored"))
186 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
187  action="store_true",
188  help = ("submit all setup Pede jobs; "
189  "maxJobs is ignored, but --job-id is respected"))
190 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
191  action="store_true",
192  help=("force the submission of the Pede job in case some "+
193  "Mille jobs are not in the OK state"))
194 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
195  action="store_true",
196  help=("force the submission of the Pede job in case some "+
197  "Mille jobs are not in the OK state. Unlike --forceMerge "+
198  "this option assumes the user has edited theScript.sh and "+
199  "alignment_merge.py to consistently pick up only the mille "+
200  "output files that exist"))
201 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
202  action="store_true",
203  help="forward VOMS proxy to batch system")
204 args = parser.parse_args(sys.argv[1:])
205 
206 
207 lib = mpslib.jobdatabase()
208 lib.read_db()
209 
210 if args.allMille:
211  # submit all Mille jobs and ignore 'maxJobs' supplied by user
212  args.maxJobs = lib.nJobs
213  args.job_id = None
214 
215 if args.job_id is None:
216  job_mask = lib.JOBDIR
217 else:
218  job_mask = []
219  for job_id in args.job_id:
220  invalid_id = False
221  if job_id.startswith("job"): job_mask.append(job_id)
222  elif job_id.startswith("m"): job_mask.append("job"+job_id)
223  else:
224  try:
225  job_mask.append(lib.JOBDIR[int(job_id)-1])
226  except ValueError:
227  invalid_id = True
228  except IndexError:
229  print("ID provided to '-j/--job-id' is out of range:", job_id)
230  sys.exit(1)
231 
232  if invalid_id or job_mask[-1] not in lib.JOBDIR:
233  print("ID provided to '-j/--job-id' is invalid:", job_id)
234  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
235  print("of the following formats:")
236  print(" - job042")
237  print(" - 042")
238  print(" - jobm1")
239  print(" - m1")
240  sys.exit(1)
241 
242 # build the absolute job directory path (needed by mps_script)
243 theJobData = os.path.join(os.getcwd(), "jobData")
244 
245 # set the job name ???????????????????
246 theJobName = 'mpalign'
247 if lib.addFiles != '':
248  theJobName = lib.addFiles
249 
250 fire_htcondor = False
251 
252 # fire the 'normal' parallel Jobs (Mille Jobs)
253 if not args.fireMerge:
254  #set the resources string coming from mps.db
255  resources = lib.get_class('mille')
256 
257  # "cmscafspec" found in $resources: special cmscaf resources
258  if 'cmscafspec' in resources:
259  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
260  resources = '-q cmscafalcamille'
261  # "cmscaf" found in $resources
262  elif 'cmscaf' in resources:
263  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
264  resources = '-q'+resources+' -m g_cmscaf'
265  elif "htcondor" in resources:
266  fire_htcondor = True
267  schedinfo = subprocess.check_output(["myschedd","show"])
268  if 'cafalca' in resources:
269  if not 'tzero' in schedinfo:
270  print("\nMPS fire: request to use CAF pool which has not been set up. Call `module load lxbatch/tzero` and try again")
271  exit(1)
272  else:
273  if not 'share' in schedinfo:
274  print("\nMPS fire: request to use standard pool when CAF pool is set up. Call `module load lxbatch/share` and try again")
275  exit(1)
276  else:
277  resources = '-q '+resources
278 
279  nSub = 0 # number of submitted Jobs
280  for i in range(lib.nJobs):
281  if lib.JOBDIR[i] not in job_mask: continue
282  if lib.JOBSTATUS[i] == 'SETUP':
283  if nSub < args.maxJobs:
284  if args.forwardProxy:
285  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
286 
287  # submit a new job with 'bsub -J ...' and check output
288  # for some reasons LSF wants script with full path
289  if fire_htcondor:
290  Path = os.path.join(theJobData,lib.JOBDIR[i])
291  scriptPath = os.path.join(Path, "theScript.sh")
292  if args.forwardProxy:
293  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
294  else:
295  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
296  submission = "condor_submit -batch-name %s %s"%\
297  (theJobName, job_submit_file)
298  else:
299  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
300  (theJobName, resources, theJobData, lib.JOBDIR[i])
301  print(submission)
302  try:
303  result = subprocess.check_output(submission,
304  stderr=subprocess.STDOUT,
305  shell=True)
306  except subprocess.CalledProcessError as e:
307  result = "" # -> check for successful job submission will fail
308  print(' '+result, end=' ')
309  result = result.strip()
310 
311  # check if job was submitted and updating jobdatabase
312  if fire_htcondor:
313  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
314  else:
315  match = re.search('Job <(\d+)> is submitted', result)
316  if match:
317  # need standard format for job number
318  lib.JOBSTATUS[i] = 'SUBTD'
319  lib.JOBID[i] = match.group(1)
320  if fire_htcondor: lib.JOBID[i] += ".0"
321  else:
322  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
323  nSub +=1
324 
325 # fire the merge job
326 else:
327  print('fire merge')
328  # set the resources string coming from mps.db
329  resources = lib.get_class('pede')
330  if 'cmscafspec' in resources:
331  resources = '-q cmscafalcamille'
332  elif "htcondor" in resources:
333  fire_htcondor = True
334  schedinfo = subprocess.check_output(["myschedd","show"])
335  if 'bigmem' in resources:
336  if not 'share' in schedinfo:
337  print("\nMPS fire: CAF pool is set up, but request to use high-memory machines which live in the standard pool. Call `module load lxbatch/share` and try again")
338  exit(1)
339  else:
340  resources = '-q '+resources
341 
342  if not fire_htcondor:
343  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
344  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
345 
346  # check whether all other jobs are OK
347  mergeOK = True
348  for i in range(lib.nJobs):
349  if lib.JOBSTATUS[i] != 'OK':
350  if 'DISABLED' not in lib.JOBSTATUS[i]:
351  mergeOK = False
352  break
353 
354  # loop over merge jobs
355  i = lib.nJobs
356  while i<len(lib.JOBDIR):
357  jobNumFrom1 = i+1
358  if lib.JOBDIR[i] not in job_mask:
359  i += 1
360  continue
361 
362  # check if current job in SETUP mode or if forced
363  if lib.JOBSTATUS[i] != 'SETUP':
364  print('Merge job %d status %s not submitted.' % \
365  (jobNumFrom1, lib.JOBSTATUS[i]))
366  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
367  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
368  else:
369  # some paths for clarity
370  Path = os.path.join(theJobData,lib.JOBDIR[i])
371  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
372  scriptPath = os.path.join(Path, "theScript.sh")
373 
374  # force option invoked:
375  if args.forceMerge:
376 
377  # make a backup copy of the script first, if it doesn't already exist.
378  if not os.path.isfile(backupScriptPath):
379  os.system('cp -p '+scriptPath+' '+backupScriptPath)
380 
381  # get the name of merge cfg file -> either the.py or alignment_merge.py
382  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
383  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
384  mergeCfg = mergeCfg.strip()
385 
386  if fire_htcondor:
387  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
388 
389  # make a backup copy of the cfg
390  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
391  cfgPath = os.path.join(Path, mergeCfg)
392  if not os.path.isfile(backupCfgPath):
393  os.system('cp -p '+cfgPath+' '+backupCfgPath)
394 
395  # retrieve weights configuration
396  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
397  weight_conf = cPickle.load(f)
398 
399  # blank weights
400  mps_tools.run_checked(["mps_weight.pl", "-c"])
401 
402  # apply weights
403  for name,weight in weight_conf:
404  print(" ".join(["mps_weight.pl", "-N", name, weight]))
405  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
406 
407  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
408  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
409  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
410  os.system(command)
411 
412  # rewrite theScript.sh using inly 'OK' jobs
413  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
414  os.system(command)
415 
416  else:
417  # restore the backup copy of the script
418  if os.path.isfile(backupScriptPath):
419  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
420 
421  # get the name of merge cfg file
422  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
423  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
424  command = 'basename '+mergeCfg
425  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
426  mergeCfg = mergeCfg.replace('\n','')
427 
428  if fire_htcondor:
429  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
430 
431  # restore the backup copy of the cfg
432  backupCfgPath = Path+'/%s.bak' % mergeCfg
433  cfgPath = Path+'/%s' % mergeCfg
434  if os.path.isfile(backupCfgPath):
435  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
436 
437  # end of if/else forceMerge
438 
439  # submit merge job
440  nMerge = i-lib.nJobs # 'index' of this merge job
441  curJobName = 'm'+str(nMerge)+'_'+theJobName
442  if args.forwardProxy: forward_proxy(Path)
443  if fire_htcondor:
444  submission = ["condor_submit",
445  "-batch-name", curJobName,
446  job_submit_file]
447  else:
448  submission = ["bsub", "-J", curJobName, resources, scriptPath]
449  for _ in range(5):
450  try:
451  result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
452  break
453  except subprocess.CalledProcessError as e:
454  result = e.output
455 
456  print(' '+result, end=' ')
457  result = result.strip()
458 
459  # check if merge job was submitted and updating jobdatabase
460  if fire_htcondor:
461  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
462  else:
463  match = re.search('Job <(\d+)> is submitted', result)
464  if match:
465  lib.JOBSTATUS[i] = 'SUBTD'
466  lib.JOBID[i] = match.group(1)
467  # need standard format for job number
468  if fire_htcondor: lib.JOBID[i] += ".0"
469  print("jobid is", lib.JOBID[i])
470  else:
471  print('Submission of merge job seems to have failed:',result, end=' ')
472 
473  i +=1
474  # end of while on merge jobs
475 
476 
477 lib.write_db()
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
mps_fire.int
int
Definition: mps_fire.py:177
digitizers_cfi.strip
strip
Definition: digitizers_cfi.py:19
mps_fire.write_HTCondor_submit_file_mille
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:125
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
mps_fire.forward_proxy
def forward_proxy(rundir)
Definition: mps_fire.py:31
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
str
#define str(s)
Definition: TestProcessor.cc:48
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
edm::print
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
mps_fire.write_HTCondor_submit_file_pede
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:46
genParticles_cff.map
map
Definition: genParticles_cff.py:11
beamvalidation.exit
def exit(msg="")
Definition: beamvalidation.py:53