CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 from builtins import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
19 import os
20 import sys
21 import glob
22 import shutil
23 if sys.version_info[0]>2:
24  import _pickle as cPickle
25 else:
26  import cPickle
27 import subprocess
28 import re
29 import argparse
30 
31 def forward_proxy(rundir):
32  """Forward proxy to location visible from the batch system.
33 
34  Arguments:
35  - `rundir`: directory for storing the forwarded proxy
36  """
37 
38  if not mps_tools.check_proxy():
39  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
40  sys.exit(1)
41 
42  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).decode().strip()
43  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
44 
45 
46 def write_HTCondor_submit_file_pede(path, script, config, lib):
47  """Writes 'job.submit' file in `path`.
48 
49  Arguments:
50  - `path`: job directory
51  - `script`: script to be executed
52  - `config`: cfg file
53  - `lib`: MPS lib object
54  """
55 
56  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
57  job_flavour = resources[-1]
58 
59  job_submit_template="""\
60 universe = vanilla
61 executable = {script:s}
62 output = {jobm:s}/STDOUT
63 error = {jobm:s}/STDOUT
64 log = {jobm:s}/HTCJOB
65 notification = Always
66 transfer_output_files = ""
67 request_memory = {pedeMem:d}M
68 
69 # adapted to space used on eos for binaries:
70 request_disk = {disk:d}
71 
72 # adapted to threads parameter in pede options and number of available cores
73 request_cpus = {cpus:d}
74 
75 +JobFlavour = "{flavour:s}"
76 """
77  if "bigmem" in resources:
78  job_submit_template += """\
79 +BigMemJob = True
80 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
81 
82 # automatically remove the job if the submitter has no permissions to run a BigMemJob
83 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
84 """
85  job_submit_template += "\nqueue\n"
86 
87  print("Determine number of pede threads...")
88  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
89  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
90  n_threads = 1
91  for option in pede_options:
92  if "threads" in option:
93  n_threads = option.replace("threads", "").strip()
94  n_threads = max(map(lambda x: int(x), n_threads.split()))
95  break
96  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
97  # cores, i.e. we ensure here that the job
98  # would fit core-wise on one machine
99 
100  print("Determine required disk space on remote host...")
101  # determine usage by each file instead of whole directory as this is what
102  # matters for the specified disk usage:
103  spco = subprocess.check_output # to make code below more less verbose
104  opj = os.path.join # dito
105  cmd = ["du", "--apparent-size"]
106  disk_usage = [int(item.split()[0])
107  for directory in ("binaries", "monitors", "tree_files")
108  for item
109  in spco(cmd+
110  glob.glob(opj(lib.mssDir, directory, "*"))).decode().splitlines()]
111  disk_usage = sum(disk_usage)
112  disk_usage *= 1.1 # reserve 10% additional space
113 
114  job_submit_file = os.path.join(Path, "job.submit")
115  with open(job_submit_file, "w") as f:
116  f.write(job_submit_template.format(script = os.path.abspath(script),
117  jobm = os.path.abspath(path),
118  pedeMem = lib.pedeMem,
119  disk = int(disk_usage),
120  cpus = n_threads,
121  flavour = job_flavour))
122 
123  return job_submit_file
124 
125 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
126  """Writes 'job.submit' file in `path`.
127 
128  Arguments:
129  - `path`: job directory
130  - `script`: script to be executed
131  - `lib`: MPS lib object
132  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
133  """
134 
135  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
136  job_flavour = resources[-1]
137 
138  job_submit_template="""\
139 universe = vanilla
140 executable = {script:s}
141 output = {jobm:s}/STDOUT
142 error = {jobm:s}/STDOUT
143 log = {jobm:s}/HTCJOB
144 notification = Always
145 transfer_output_files = ""
146 
147 +JobFlavour = "{flavour:s}"
148 """
149  if "cafalca" in resources:
150  job_submit_template += """\
151 +CAFJob = True
152 +AccountingGroup = "group_u_CMS.CAF.ALCA"
153 # automatically remove the job if the submitter has no permissions to run a CAF Job
154 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True
155 """
156 
157  if proxy_path is not None:
158  job_submit_template += """\
159 +x509userproxy = "{proxy:s}"
160 """
161  job_submit_template += "\nqueue\n"
162 
163  job_submit_file = os.path.join(Path, "job.submit")
164  with open(job_submit_file, "w") as f:
165  f.write(job_submit_template.format(script = os.path.abspath(script),
166  jobm = os.path.abspath(path),
167  flavour = job_flavour,
168  proxy = proxy_path))
169 
170  return job_submit_file
171 
172 
173 
174 parser = argparse.ArgumentParser(
175  description="Submit jobs that are setup in local mps database to batch system.",
176 )
177 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
178  help="number of Mille jobs to be submitted (default: %(default)d)")
179 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
180  help = ("job IDs to be submitted; "
181  "use either 'job<ID>' or directly '<ID>'"))
182 parser.add_argument("-r", "--resubmit", dest = "resub", default=False,
183  action="store_true",
184  help = ("resubmit jobs - only works if job IDs are specified"))
185 parser.add_argument("-a", "--all", dest="allMille", default=False,
186  action="store_true",
187  help = ("submit all setup Mille jobs; "
188  "maxJobs and --job-id are ignored"))
189 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
190  action="store_true",
191  help = ("submit all setup Pede jobs; "
192  "maxJobs is ignored, but --job-id is respected"))
193 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
194  action="store_true",
195  help=("force the submission of the Pede job in case some "+
196  "Mille jobs are not in the OK state"))
197 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
198  action="store_true",
199  help=("force the submission of the Pede job in case some "+
200  "Mille jobs are not in the OK state. Unlike --forceMerge "+
201  "this option assumes the user has edited theScript.sh and "+
202  "alignment_merge.py to consistently pick up only the mille "+
203  "output files that exist"))
204 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
205  action="store_true",
206  help="forward VOMS proxy to batch system")
207 args = parser.parse_args(sys.argv[1:])
208 
209 
210 lib = mpslib.jobdatabase()
211 lib.read_db()
212 
213 if args.allMille:
214  # submit all Mille jobs and ignore 'maxJobs' supplied by user
215  args.maxJobs = lib.nJobs
216  args.job_id = None
217 
218 if args.job_id is None and args.resub:
219  print("Can only resubmit jobs if job IDs are specified")
220  sys.exit(1)
221 
222 
223 if args.job_id is None:
224  job_mask = lib.JOBDIR
225 else:
226  job_mask = []
227  for job_id in args.job_id:
228  invalid_id = False
229  if job_id.startswith("job"): job_mask.append(job_id)
230  elif job_id.startswith("m"): job_mask.append("job"+job_id)
231  else:
232  try:
233  job_mask.append(lib.JOBDIR[int(job_id)-1])
234  except ValueError:
235  invalid_id = True
236  except IndexError:
237  print("ID provided to '-j/--job-id' is out of range:", job_id)
238  sys.exit(1)
239 
240  if invalid_id or job_mask[-1] not in lib.JOBDIR:
241  print("ID provided to '-j/--job-id' is invalid:", job_id)
242  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
243  print("of the following formats:")
244  print(" - job042")
245  print(" - 042")
246  print(" - jobm1")
247  print(" - m1")
248  sys.exit(1)
249 
250 # build the absolute job directory path (needed by mps_script)
251 theJobData = os.path.join(os.getcwd(), "jobData")
252 
253 # set the job name ???????????????????
254 theJobName = 'mpalign'
255 if lib.addFiles != '':
256  theJobName = lib.addFiles
257 
258 fire_htcondor = False
259 
260 # fire the 'normal' parallel Jobs (Mille Jobs)
261 if not args.fireMerge:
262  #set the resources string coming from mps.db
263  resources = lib.get_class('mille')
264 
265  # "cmscafspec" found in $resources: special cmscaf resources
266  if 'cmscafspec' in resources:
267  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
268  resources = '-q cmscafalcamille'
269  # "cmscaf" found in $resources
270  elif 'cmscaf' in resources:
271  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
272  resources = '-q'+resources+' -m g_cmscaf'
273  elif "htcondor" in resources:
274  fire_htcondor = True
275  schedinfo = subprocess.check_output(["myschedd","show"]).decode()
276  if 'cafalca' in resources:
277  if not 'tzero' in schedinfo:
278  print("\nMPS fire: request to use CAF pool which has not been set up. Call `module load lxbatch/tzero` and try again")
279  exit(1)
280  else:
281  if not 'share' in schedinfo:
282  print("\nMPS fire: request to use standard pool when CAF pool is set up. Call `module load lxbatch/share` and try again")
283  exit(1)
284  else:
285  resources = '-q '+resources
286 
287  nSub = 0 # number of submitted Jobs
288  for i in range(lib.nJobs):
289  if lib.JOBDIR[i] not in job_mask: continue
290  if lib.JOBSTATUS[i] == 'SETUP':
291  if nSub < args.maxJobs:
292  if args.forwardProxy:
293  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
294 
295  # submit a new job with 'bsub -J ...' and check output
296  # for some reasons LSF wants script with full path
297  if fire_htcondor:
298  Path = os.path.join(theJobData,lib.JOBDIR[i])
299  scriptPath = os.path.join(Path, "theScript.sh")
300  if args.forwardProxy:
301  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
302  else:
303  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
304  submission = "condor_submit -batch-name %s %s"%\
305  (theJobName, job_submit_file)
306  else:
307  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
308  (theJobName, resources, theJobData, lib.JOBDIR[i])
309  print(submission)
310  try:
311  result = subprocess.check_output(submission,
312  stderr=subprocess.STDOUT,
313  shell=True).decode()
314  except subprocess.CalledProcessError as e:
315  result = "" # -> check for successful job submission will fail
316  #print(' '+result, end=' ')
317  print(result)
318  result = result.strip()
319 
320  # check if job was submitted and updating jobdatabase
321  if fire_htcondor:
322  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
323  else:
324  match = re.search('Job <(\d+)> is submitted', result)
325  if match:
326  # need standard format for job number
327  lib.JOBSTATUS[i] = 'SUBTD'
328  lib.JOBID[i] = match.group(1)
329  if fire_htcondor: lib.JOBID[i] += ".0"
330  else:
331  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
332  nSub +=1
333 
334  elif args.resub:
335  if nSub < args.maxJobs:
336  if args.forwardProxy:
337  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
338 
339  try:
340  os.remove("%s/%s/HTCJOB" % (theJobData, lib.JOBDIR[i]))
341  except OSError as e:
342  print("Cannot delete file %s/%s/HTCJOB :" % (theJobData,lib.JOBDIR[i]), e.strerror)
343  try:
344  os.remove("%s/%s/STDOUT" % (theJobData, lib.JOBDIR[i]))
345  except OSError as e:
346  print("Cannot delete file %s/%s/STDOUT :" % (theJobData,lib.JOBDIR[i]), e.strerror)
347  try:
348  os.remove("%s/%s/STDOUT.gz" % (theJobData, lib.JOBDIR[i]))
349  except OSError as e:
350  print("Cannot delete file %s/%s/STDOUT.gz :" % (theJobData,lib.JOBDIR[i]), e.strerror)
351  try:
352  os.remove("%s/%s/alignment.log.gz" % (theJobData, lib.JOBDIR[i]))
353  except OSError as e:
354  print("Cannot delete file %s/%s/alignment.log.gz :" % (theJobData,lib.JOBDIR[i]), e.strerror)
355  try:
356  os.remove("%s/%s/millePedeMonitor%03d.root" % (theJobData, lib.JOBDIR[i], lib.JOBNUMBER[i]))
357  except OSError as e:
358  print("Cannot delete file %s/%s/millePedeMonitor%03d.root :" % (theJobData,lib.JOBDIR[i],lib.JOBNUMBER[i]), e.strerror)
359 
360  # submit a new job with 'bsub -J ...' and check output
361  # for some reasons LSF wants script with full path
362  if fire_htcondor:
363  Path = os.path.join(theJobData,lib.JOBDIR[i])
364  scriptPath = os.path.join(Path, "theScript.sh")
365  if args.forwardProxy:
366  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
367  else:
368  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
369  submission = "condor_submit -batch-name %s %s"%\
370  (theJobName, job_submit_file)
371  else:
372  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
373  (theJobName, resources, theJobData, lib.JOBDIR[i])
374  print(submission)
375  try:
376  result = subprocess.check_output(submission,
377  stderr=subprocess.STDOUT,
378  shell=True).decode()
379  except subprocess.CalledProcessError as e:
380  result = "" # -> check for successful job submission will fail
381  print(' '+result, end=' ')
382  result = result.strip()
383 
384  # check if job was submitted and updating jobdatabase
385  if fire_htcondor:
386  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
387  else:
388  match = re.search('Job <(\d+)> is submitted', result)
389  if match:
390  # need standard format for job number
391  lib.JOBSTATUS[i] = 'SUBTD'
392  lib.JOBID[i] = match.group(1)
393  if fire_htcondor: lib.JOBID[i] += ".0"
394  else:
395  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
396  nSub +=1
397 
398 
399 # fire the merge job
400 else:
401  print('fire merge')
402  # set the resources string coming from mps.db
403  resources = lib.get_class('pede')
404  if 'cmscafspec' in resources:
405  resources = '-q cmscafalcamille'
406  elif "htcondor" in resources:
407  fire_htcondor = True
408  schedinfo = subprocess.check_output(["myschedd","show"]).decode()
409  if 'bigmem' in resources:
410  if not 'share' in schedinfo:
411  print("\nMPS fire: CAF pool is set up, but request to use high-memory machines which live in the standard pool. Call `module load lxbatch/share` and try again")
412  exit(1)
413  else:
414  resources = '-q '+resources
415 
416  if not fire_htcondor:
417  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
418  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
419 
420  # check whether all other jobs are OK
421  mergeOK = True
422  for i in range(lib.nJobs):
423  if lib.JOBSTATUS[i] != 'OK':
424  if 'DISABLED' not in lib.JOBSTATUS[i]:
425  mergeOK = False
426  break
427 
428  # loop over merge jobs
429  i = lib.nJobs
430  while i<len(lib.JOBDIR):
431  jobNumFrom1 = i+1
432  if lib.JOBDIR[i] not in job_mask:
433  i += 1
434  continue
435 
436  # check if current job in SETUP mode or if forced
437  if lib.JOBSTATUS[i] != 'SETUP' and not args.resub:
438  print('Merge job %d status %s not submitted.' % \
439  (jobNumFrom1, lib.JOBSTATUS[i]))
440  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
441  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
442  elif not args.resub:
443  # some paths for clarity
444  Path = os.path.join(theJobData,lib.JOBDIR[i])
445  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
446  scriptPath = os.path.join(Path, "theScript.sh")
447 
448  # force option invoked:
449  if args.forceMerge:
450 
451  # make a backup copy of the script first, if it doesn't already exist.
452  if not os.path.isfile(backupScriptPath):
453  os.system('cp -p '+scriptPath+' '+backupScriptPath)
454 
455  # get the name of merge cfg file -> either the.py or alignment_merge.py
456  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
457  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
458  mergeCfg = mergeCfg.strip()
459 
460  if fire_htcondor:
461  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
462 
463  # make a backup copy of the cfg
464  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
465  cfgPath = os.path.join(Path, mergeCfg)
466  if not os.path.isfile(backupCfgPath):
467  os.system('cp -p '+cfgPath+' '+backupCfgPath)
468 
469  # retrieve weights configuration
470  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
471  weight_conf = cPickle.load(f)
472 
473  # blank weights
474  mps_tools.run_checked(["mps_weight.pl", "-c"])
475 
476  # apply weights
477  for name,weight in weight_conf:
478  print(" ".join(["mps_weight.pl", "-N", name, weight]))
479  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
480 
481  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
482  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
483  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
484  os.system(command)
485 
486  # rewrite theScript.sh using inly 'OK' jobs
487  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
488  os.system(command)
489 
490  else:
491  # restore the backup copy of the script
492  if os.path.isfile(backupScriptPath):
493  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
494 
495  # get the name of merge cfg file
496  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
497  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
498  command = 'basename '+mergeCfg
499  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
500  mergeCfg = mergeCfg.replace('\n','')
501 
502  if fire_htcondor:
503  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
504 
505  # restore the backup copy of the cfg
506  backupCfgPath = Path+'/%s.bak' % mergeCfg
507  cfgPath = Path+'/%s' % mergeCfg
508  if os.path.isfile(backupCfgPath):
509  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
510 
511  # end of if/else forceMerge
512 
513  # submit merge job
514  nMerge = i-lib.nJobs # 'index' of this merge job
515  curJobName = 'm'+str(nMerge)+'_'+theJobName
516  if args.forwardProxy: forward_proxy(Path)
517  if fire_htcondor:
518  submission = ["condor_submit",
519  "-batch-name", curJobName,
520  job_submit_file]
521  else:
522  submission = ["bsub", "-J", curJobName, resources, scriptPath]
523  for _ in range(5):
524  try:
525  result = subprocess.check_output(submission, stderr=subprocess.STDOUT).decode()
526  break
527  except subprocess.CalledProcessError as e:
528  result = e.output
529 
530  print(' '+result, end=' ')
531  result = result.strip()
532 
533  # check if merge job was submitted and updating jobdatabase
534  if fire_htcondor:
535  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
536  else:
537  match = re.search('Job <(\d+)> is submitted', result)
538  if match:
539  lib.JOBSTATUS[i] = 'SUBTD'
540  lib.JOBID[i] = match.group(1)
541  # need standard format for job number
542  if fire_htcondor: lib.JOBID[i] += ".0"
543  print("jobid is", lib.JOBID[i])
544  else:
545  print('Submission of merge job seems to have failed:',result, end=' ')
546 
547  elif args.resub:
548  # some paths for clarity
549  Path = os.path.join(theJobData,lib.JOBDIR[i])
550  dircontents = os.listdir(Path)
551  for outfile in dircontents:
552  if outfile.endswith(".root"):
553  os.remove("%s/%s" %(Path, outfile))
554  try:
555  os.remove("%s/HTCJOB" % (Path))
556  except OSError as e:
557  print("Cannot delete file %s/HTCJOB :" % (Path), e.strerror)
558  try:
559  os.remove("%s/STDOUT" % (Path))
560  except OSError as e:
561  print("Cannot delete file %s/STDOUT :" % (Path), e.strerror)
562  try:
563  os.remove("%s/STDOUT.gz" % (Path))
564  except OSError as e:
565  print("Cannot delete file %s/STDOUT.gz :" % (Path), e.strerror)
566  try:
567  os.remove("%s/alignment.log.gz" % (Path))
568  except OSError as e:
569  print("Cannot delete file %s/alignment.log.gz :" % (Path), e.strerror)
570 
571 
572  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
573  scriptPath = os.path.join(Path, "theScript.sh")
574 
575  # restore the backup copy of the script
576  if os.path.isfile(backupScriptPath):
577  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
578 
579  # get the name of merge cfg file
580  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
581  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
582  command = 'basename '+mergeCfg
583  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
584  mergeCfg = mergeCfg.replace('\n','')
585 
586  if fire_htcondor:
587  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
588 
589  # restore the backup copy of the cfg
590  backupCfgPath = Path+'/%s.bak' % mergeCfg
591  cfgPath = Path+'/%s' % mergeCfg
592  if os.path.isfile(backupCfgPath):
593  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
594 
595 
596  # submit merge job
597  nMerge = i-lib.nJobs # 'index' of this merge job
598  curJobName = 'm'+str(nMerge)+'_'+theJobName
599  if args.forwardProxy: forward_proxy(Path)
600  if fire_htcondor:
601  submission = ["condor_submit",
602  "-batch-name", curJobName,
603  job_submit_file]
604  else:
605  submission = ["bsub", "-J", curJobName, resources, scriptPath]
606  for _ in range(5):
607  try:
608  result = subprocess.check_output(submission, stderr=subprocess.STDOUT).decode()
609  break
610  except subprocess.CalledProcessError as e:
611  result = e.output
612 
613  print(' '+result, end=' ')
614  result = result.strip()
615 
616  # check if merge job was submitted and updating jobdatabase
617  if fire_htcondor:
618  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
619  else:
620  match = re.search('Job <(\d+)> is submitted', result)
621  if match:
622  lib.JOBSTATUS[i] = 'SUBTD'
623  lib.JOBID[i] = match.group(1)
624  # need standard format for job number
625  if fire_htcondor: lib.JOBID[i] += ".0"
626  print("jobid is", lib.JOBID[i])
627  else:
628  print('Submission of merge job seems to have failed:',result, end=' ')
629 
630  i +=1
631 
632  # end of while on merge jobs
633 
634 
635 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:31
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:125
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:46
bool decode(bool &, std::string_view)
Definition: types.cc:72
#define str(s)
def exit(msg="")