CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 from __future__ import print_function
16 from builtins import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
19 import os
20 import sys
21 import glob
22 import shutil
23 if sys.version_info[0]>2:
24  import _pickle as cPickle
25 else:
26  import cPickle
27 import subprocess
28 import re
29 import argparse
30 
31 def forward_proxy(rundir):
32  """Forward proxy to location visible from the batch system.
33 
34  Arguments:
35  - `rundir`: directory for storing the forwarded proxy
36  """
37 
38  if not mps_tools.check_proxy():
39  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
40  sys.exit(1)
41 
42  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).decode().strip()
43  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
44 
45 
46 def write_HTCondor_submit_file_pede(path, script, config, lib):
47  """Writes 'job.submit' file in `path`.
48 
49  Arguments:
50  - `path`: job directory
51  - `script`: script to be executed
52  - `config`: cfg file
53  - `lib`: MPS lib object
54  """
55 
56  resources = lib.get_class("pede").split("_")[1:] # strip off 'htcondor'
57  job_flavour = resources[-1]
58 
59  job_submit_template="""\
60 universe = vanilla
61 executable = {script:s}
62 output = {jobm:s}/STDOUT
63 error = {jobm:s}/STDOUT
64 log = {jobm:s}/HTCJOB
65 notification = Always
66 transfer_output_files = ""
67 request_memory = {pedeMem:d}M
68 
69 # adapted to space used on eos for binaries:
70 request_disk = {disk:d}
71 
72 # adapted to threads parameter in pede options and number of available cores
73 request_cpus = {cpus:d}
74 
75 +JobFlavour = "{flavour:s}"
76 """
77  if "bigmem" in resources:
78  job_submit_template += """\
79 +BigMemJob = True
80 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem"
81 
82 # automatically remove the job if the submitter has no permissions to run a BigMemJob
83 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True
84 """
85  job_submit_template += "\nqueue\n"
86 
87  print("Determine number of pede threads...")
88  cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
89  pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
90  n_threads = 1
91  for option in pede_options:
92  if "threads" in option:
93  n_threads = option.replace("threads", "").strip()
94  n_threads = max(map(lambda x: int(x), n_threads.split()))
95  break
96  if n_threads > 16: n_threads = 16 # HTCondor machines have (currently) 16
97  # cores, i.e. we ensure here that the job
98  # would fit core-wise on one machine
99 
100  print("Determine required disk space on remote host...")
101  # determine usage by each file instead of whole directory as this is what
102  # matters for the specified disk usage:
103  spco = subprocess.check_output # to make code below more less verbose
104  opj = os.path.join # dito
105  cmd = ["du", "--apparent-size"]
106  disk_usage = [int(item.split()[0])
107  for directory in ("binaries", "monitors", "tree_files")
108  for item
109  in spco(cmd+
110  glob.glob(opj(lib.mssDir, directory, "*"))).decode().splitlines()]
111  disk_usage = sum(disk_usage)
112  disk_usage *= 1.1 # reserve 10% additional space
113 
114  job_submit_file = os.path.join(Path, "job.submit")
115  with open(job_submit_file, "w") as f:
116  f.write(job_submit_template.format(script = os.path.abspath(script),
117  jobm = os.path.abspath(path),
118  pedeMem = lib.pedeMem,
119  disk = int(disk_usage),
120  cpus = n_threads,
121  flavour = job_flavour))
122 
123  return job_submit_file
124 
125 def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None):
126  """Writes 'job.submit' file in `path`.
127 
128  Arguments:
129  - `path`: job directory
130  - `script`: script to be executed
131  - `lib`: MPS lib object
132  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
133  """
134 
135  resources = lib.get_class("mille").split("_")[1:] # strip off 'htcondor'
136  job_flavour = resources[-1]
137 
138  job_submit_template="""\
139 universe = vanilla
140 executable = {script:s}
141 output = {jobm:s}/STDOUT
142 error = {jobm:s}/STDOUT
143 log = {jobm:s}/HTCJOB
144 notification = Always
145 transfer_output_files = ""
146 
147 +JobFlavour = "{flavour:s}"
148 """
149  if "cafalca" in resources:
150  job_submit_template += """\
151 +CAFJob = True
152 +AccountingGroup = "group_u_CMS.CAF.ALCA"
153 # automatically remove the job if the submitter has no permissions to run a CAF Job
154 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True
155 """
156 
157  if proxy_path is not None:
158  job_submit_template += """\
159 +x509userproxy = "{proxy:s}"
160 """
161  job_submit_template += "\nqueue\n"
162 
163  job_submit_file = os.path.join(Path, "job.submit")
164  with open(job_submit_file, "w") as f:
165  f.write(job_submit_template.format(script = os.path.abspath(script),
166  jobm = os.path.abspath(path),
167  flavour = job_flavour,
168  proxy = proxy_path))
169 
170  return job_submit_file
171 
172 
173 
174 parser = argparse.ArgumentParser(
175  description="Submit jobs that are setup in local mps database to batch system.",
176 )
177 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
178  help="number of Mille jobs to be submitted (default: %(default)d)")
179 parser.add_argument("-j", "--job-id", dest = "job_id", nargs = "*",
180  help = ("job IDs to be submitted; "
181  "use either 'job<ID>' or directly '<ID>'"))
182 parser.add_argument("-r", "--resubmit", dest = "resub", default=False,
183  action="store_true",
184  help = ("resubmit jobs - only works if job IDs are specified"))
185 parser.add_argument("-a", "--all", dest="allMille", default=False,
186  action="store_true",
187  help = ("submit all setup Mille jobs; "
188  "maxJobs and --job-id are ignored"))
189 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
190  action="store_true",
191  help = ("submit all setup Pede jobs; "
192  "maxJobs is ignored, but --job-id is respected"))
193 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
194  action="store_true",
195  help=("force the submission of the Pede job in case some "+
196  "Mille jobs are not in the OK state"))
197 parser.add_argument("--force-merge-manual", dest="forceMergeManual", default=False,
198  action="store_true",
199  help=("force the submission of the Pede job in case some "+
200  "Mille jobs are not in the OK state. Unlike --forceMerge "+
201  "this option assumes the user has edited theScript.sh and "+
202  "alignment_merge.py to consistently pick up only the mille "+
203  "output files that exist"))
204 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
205  action="store_true",
206  help="forward VOMS proxy to batch system")
207 args = parser.parse_args(sys.argv[1:])
208 
209 
210 lib = mpslib.jobdatabase()
211 lib.read_db()
212 
213 if args.allMille:
214  # submit all Mille jobs and ignore 'maxJobs' supplied by user
215  args.maxJobs = lib.nJobs
216  args.job_id = None
217 
218 if args.job_id is None and args.resub:
219  print("Can only resubmit jobs if job IDs are specified")
220  sys.exit(1)
221 
222 
223 if args.job_id is None:
224  job_mask = lib.JOBDIR
225 else:
226  job_mask = []
227  for job_id in args.job_id:
228  invalid_id = False
229  if job_id.startswith("job"): job_mask.append(job_id)
230  elif job_id.startswith("m"): job_mask.append("job"+job_id)
231  else:
232  try:
233  job_mask.append(lib.JOBDIR[int(job_id)-1])
234  except ValueError:
235  invalid_id = True
236  except IndexError:
237  print("ID provided to '-j/--job-id' is out of range:", job_id)
238  sys.exit(1)
239 
240  if invalid_id or job_mask[-1] not in lib.JOBDIR:
241  print("ID provided to '-j/--job-id' is invalid:", job_id)
242  print("'-j/--job-id' requires the IDs to exist and to be of either", end=' ')
243  print("of the following formats:")
244  print(" - job042")
245  print(" - 042")
246  print(" - jobm1")
247  print(" - m1")
248  sys.exit(1)
249 
250 # build the absolute job directory path (needed by mps_script)
251 theJobData = os.path.join(os.getcwd(), "jobData")
252 
253 # set the job name ???????????????????
254 theJobName = 'mpalign'
255 if lib.addFiles != '':
256  theJobName = lib.addFiles
257 
258 fire_htcondor = False
259 
260 # fire the 'normal' parallel Jobs (Mille Jobs)
261 if not args.fireMerge:
262  #set the resources string coming from mps.db
263  resources = lib.get_class('mille')
264 
265  # "cmscafspec" found in $resources: special cmscaf resources
266  if 'cmscafspec' in resources:
267  print('\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
268  resources = '-q cmscafalcamille'
269  # "cmscaf" found in $resources
270  elif 'cmscaf' in resources:
271  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
272  resources = '-q'+resources+' -m g_cmscaf'
273  elif "htcondor" in resources:
274  fire_htcondor = True
275  schedinfo = subprocess.check_output(["myschedd","show"]).decode()
276  if 'cafalca' in resources:
277  if not 'tzero' in schedinfo:
278  print("\nMPS fire: request to use CAF pool which has not been set up. Call `module load lxbatch/tzero` and try again")
279  exit(1)
280  else:
281  if not 'share' in schedinfo:
282  print("\nMPS fire: request to use standard pool when CAF pool is set up. Call `module load lxbatch/share` and try again")
283  exit(1)
284  else:
285  resources = '-q '+resources
286 
287  nSub = 0 # number of submitted Jobs
288  for i in range(lib.nJobs):
289  if lib.JOBDIR[i] not in job_mask: continue
290  if lib.JOBSTATUS[i] == 'SETUP':
291  if nSub < args.maxJobs:
292  if args.forwardProxy:
293  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
294 
295  # submit a new job with 'bsub -J ...' and check output
296  # for some reasons LSF wants script with full path
297  if fire_htcondor:
298  Path = os.path.join(theJobData,lib.JOBDIR[i])
299  scriptPath = os.path.join(Path, "theScript.sh")
300  if args.forwardProxy:
301  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
302  else:
303  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
304  submission = "condor_submit -batch-name %s %s"%\
305  (theJobName, job_submit_file)
306  else:
307  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
308  (theJobName, resources, theJobData, lib.JOBDIR[i])
309  print(submission)
310  try:
311  result = subprocess.check_output(submission,
312  stderr=subprocess.STDOUT,
313  shell=True).decode()
314  except subprocess.CalledProcessError as e:
315  result = "" # -> check for successful job submission will fail
316  print(result)
317  result = result.strip()
318 
319  # check if job was submitted and updating jobdatabase
320  if fire_htcondor:
321  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
322  else:
323  match = re.search('Job <(\d+)> is submitted', result)
324  if match:
325  # need standard format for job number
326  lib.JOBSTATUS[i] = 'SUBTD'
327  lib.JOBID[i] = match.group(1)
328  if fire_htcondor: lib.JOBID[i] += ".0"
329  else:
330  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
331  nSub +=1
332 
333  elif args.resub:
334  if nSub < args.maxJobs:
335  if args.forwardProxy:
336  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
337 
338  try:
339  os.remove("%s/%s/HTCJOB" % (theJobData, lib.JOBDIR[i]))
340  except OSError as e:
341  print("Cannot delete file %s/%s/HTCJOB :" % (theJobData,lib.JOBDIR[i]), e.strerror)
342  try:
343  os.remove("%s/%s/STDOUT" % (theJobData, lib.JOBDIR[i]))
344  except OSError as e:
345  print("Cannot delete file %s/%s/STDOUT :" % (theJobData,lib.JOBDIR[i]), e.strerror)
346  try:
347  os.remove("%s/%s/STDOUT.gz" % (theJobData, lib.JOBDIR[i]))
348  except OSError as e:
349  print("Cannot delete file %s/%s/STDOUT.gz :" % (theJobData,lib.JOBDIR[i]), e.strerror)
350  try:
351  os.remove("%s/%s/alignment.log.gz" % (theJobData, lib.JOBDIR[i]))
352  except OSError as e:
353  print("Cannot delete file %s/%s/alignment.log.gz :" % (theJobData,lib.JOBDIR[i]), e.strerror)
354  try:
355  os.remove("%s/%s/millePedeMonitor%03d.root" % (theJobData, lib.JOBDIR[i], lib.JOBNUMBER[i]))
356  except OSError as e:
357  print("Cannot delete file %s/%s/millePedeMonitor%03d.root :" % (theJobData,lib.JOBDIR[i],lib.JOBNUMBER[i]), e.strerror)
358 
359  # submit a new job with 'bsub -J ...' and check output
360  # for some reasons LSF wants script with full path
361  if fire_htcondor:
362  Path = os.path.join(theJobData,lib.JOBDIR[i])
363  scriptPath = os.path.join(Path, "theScript.sh")
364  if args.forwardProxy:
365  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib,os.path.join(Path,".user_proxy"))
366  else:
367  job_submit_file = write_HTCondor_submit_file_mille(Path, scriptPath, lib)
368  submission = "condor_submit -batch-name %s %s"%\
369  (theJobName, job_submit_file)
370  else:
371  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
372  (theJobName, resources, theJobData, lib.JOBDIR[i])
373  print(submission)
374  try:
375  result = subprocess.check_output(submission,
376  stderr=subprocess.STDOUT,
377  shell=True).decode()
378  except subprocess.CalledProcessError as e:
379  result = "" # -> check for successful job submission will fail
380  print(' '+result, end=' ')
381  result = result.strip()
382 
383  # check if job was submitted and updating jobdatabase
384  if fire_htcondor:
385  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
386  else:
387  match = re.search('Job <(\d+)> is submitted', result)
388  if match:
389  # need standard format for job number
390  lib.JOBSTATUS[i] = 'SUBTD'
391  lib.JOBID[i] = match.group(1)
392  if fire_htcondor: lib.JOBID[i] += ".0"
393  else:
394  print('Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=' ')
395  nSub +=1
396 
397 
398 # fire the merge job
399 else:
400  print('fire merge')
401  # set the resources string coming from mps.db
402  resources = lib.get_class('pede')
403  if 'cmscafspec' in resources:
404  resources = '-q cmscafalcamille'
405  elif "htcondor" in resources:
406  fire_htcondor = True
407  schedinfo = subprocess.check_output(["myschedd","show"]).decode()
408  if 'bigmem' in resources:
409  if not 'share' in schedinfo:
410  print("\nMPS fire: CAF pool is set up, but request to use high-memory machines which live in the standard pool. Call `module load lxbatch/share` and try again")
411  exit(1)
412  else:
413  resources = '-q '+resources
414 
415  if not fire_htcondor:
416  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
417  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
418 
419  # check whether all other jobs are OK
420  mergeOK = True
421  for i in range(lib.nJobs):
422  if lib.JOBSTATUS[i] != 'OK':
423  if 'DISABLED' not in lib.JOBSTATUS[i]:
424  mergeOK = False
425  break
426 
427  # loop over merge jobs
428  i = lib.nJobs
429  while i<len(lib.JOBDIR):
430  jobNumFrom1 = i+1
431  if lib.JOBDIR[i] not in job_mask:
432  i += 1
433  continue
434 
435  # check if current job in SETUP mode or if forced
436  if lib.JOBSTATUS[i] != 'SETUP' and not args.resub:
437  print('Merge job %d status %s not submitted.' % \
438  (jobNumFrom1, lib.JOBSTATUS[i]))
439  elif not (mergeOK or args.forceMerge or args.forceMergeManual):
440  print('Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
441  elif not args.resub:
442  # some paths for clarity
443  Path = os.path.join(theJobData,lib.JOBDIR[i])
444  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
445  scriptPath = os.path.join(Path, "theScript.sh")
446 
447  # force option invoked:
448  if args.forceMerge:
449 
450  # make a backup copy of the script first, if it doesn't already exist.
451  if not os.path.isfile(backupScriptPath):
452  os.system('cp -p '+scriptPath+' '+backupScriptPath)
453 
454  # get the name of merge cfg file -> either the.py or alignment_merge.py
455  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
456  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
457  mergeCfg = mergeCfg.strip()
458 
459  if fire_htcondor:
460  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
461 
462  # make a backup copy of the cfg
463  backupCfgPath = os.path.join(Path, mergeCfg+".bak")
464  cfgPath = os.path.join(Path, mergeCfg)
465  if not os.path.isfile(backupCfgPath):
466  os.system('cp -p '+cfgPath+' '+backupCfgPath)
467 
468  # retrieve weights configuration
469  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
470  weight_conf = cPickle.load(f)
471 
472  # blank weights
473  mps_tools.run_checked(["mps_weight.pl", "-c"])
474 
475  # apply weights
476  for name,weight in weight_conf:
477  print(" ".join(["mps_weight.pl", "-N", name, weight]))
478  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
479 
480  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
481  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
482  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
483  os.system(command)
484 
485  # rewrite theScript.sh using inly 'OK' jobs
486  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
487  os.system(command)
488 
489  else:
490  # restore the backup copy of the script
491  if os.path.isfile(backupScriptPath):
492  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
493 
494  # get the name of merge cfg file
495  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
496  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
497  command = 'basename '+mergeCfg
498  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
499  mergeCfg = mergeCfg.replace('\n','')
500 
501  if fire_htcondor:
502  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
503 
504  # restore the backup copy of the cfg
505  backupCfgPath = Path+'/%s.bak' % mergeCfg
506  cfgPath = Path+'/%s' % mergeCfg
507  if os.path.isfile(backupCfgPath):
508  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
509 
510  # end of if/else forceMerge
511 
512  # submit merge job
513  nMerge = i-lib.nJobs # 'index' of this merge job
514  curJobName = 'm'+str(nMerge)+'_'+theJobName
515  if args.forwardProxy: forward_proxy(Path)
516  if fire_htcondor:
517  submission = ["condor_submit",
518  "-batch-name", curJobName,
519  job_submit_file]
520  else:
521  submission = ["bsub", "-J", curJobName, resources, scriptPath]
522  for _ in range(5):
523  try:
524  result = subprocess.check_output(submission, stderr=subprocess.STDOUT).decode()
525  break
526  except subprocess.CalledProcessError as e:
527  result = e.output
528 
529  print(' '+result, end=' ')
530  result = result.strip()
531 
532  # check if merge job was submitted and updating jobdatabase
533  if fire_htcondor:
534  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
535  else:
536  match = re.search('Job <(\d+)> is submitted', result)
537  if match:
538  lib.JOBSTATUS[i] = 'SUBTD'
539  lib.JOBID[i] = match.group(1)
540  # need standard format for job number
541  if fire_htcondor: lib.JOBID[i] += ".0"
542  print("jobid is", lib.JOBID[i])
543  else:
544  print('Submission of merge job seems to have failed:',result, end=' ')
545 
546  elif args.resub:
547  # some paths for clarity
548  Path = os.path.join(theJobData,lib.JOBDIR[i])
549  dircontents = os.listdir(Path)
550  for outfile in dircontents:
551  if outfile.endswith(".root"):
552  os.remove("%s/%s" %(Path, outfile))
553  try:
554  os.remove("%s/HTCJOB" % (Path))
555  except OSError as e:
556  print("Cannot delete file %s/HTCJOB :" % (Path), e.strerror)
557  try:
558  os.remove("%s/STDOUT" % (Path))
559  except OSError as e:
560  print("Cannot delete file %s/STDOUT :" % (Path), e.strerror)
561  try:
562  os.remove("%s/STDOUT.gz" % (Path))
563  except OSError as e:
564  print("Cannot delete file %s/STDOUT.gz :" % (Path), e.strerror)
565  try:
566  os.remove("%s/alignment.log.gz" % (Path))
567  except OSError as e:
568  print("Cannot delete file %s/alignment.log.gz :" % (Path), e.strerror)
569 
570 
571  backupScriptPath = os.path.join(Path, "theScript.sh.bak")
572  scriptPath = os.path.join(Path, "theScript.sh")
573 
574  # restore the backup copy of the script
575  if os.path.isfile(backupScriptPath):
576  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
577 
578  # get the name of merge cfg file
579  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
580  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
581  command = 'basename '+mergeCfg
582  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode()
583  mergeCfg = mergeCfg.replace('\n','')
584 
585  if fire_htcondor:
586  job_submit_file = write_HTCondor_submit_file_pede(Path, scriptPath, mergeCfg, lib)
587 
588  # restore the backup copy of the cfg
589  backupCfgPath = Path+'/%s.bak' % mergeCfg
590  cfgPath = Path+'/%s' % mergeCfg
591  if os.path.isfile(backupCfgPath):
592  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
593 
594 
595  # submit merge job
596  nMerge = i-lib.nJobs # 'index' of this merge job
597  curJobName = 'm'+str(nMerge)+'_'+theJobName
598  if args.forwardProxy: forward_proxy(Path)
599  if fire_htcondor:
600  submission = ["condor_submit",
601  "-batch-name", curJobName,
602  job_submit_file]
603  else:
604  submission = ["bsub", "-J", curJobName, resources, scriptPath]
605  for _ in range(5):
606  try:
607  result = subprocess.check_output(submission, stderr=subprocess.STDOUT).decode()
608  break
609  except subprocess.CalledProcessError as e:
610  result = e.output
611 
612  print(' '+result, end=' ')
613  result = result.strip()
614 
615  # check if merge job was submitted and updating jobdatabase
616  if fire_htcondor:
617  match = re.search(r"1 job\(s\) submitted to cluster (\d+)\.", result)
618  else:
619  match = re.search('Job <(\d+)> is submitted', result)
620  if match:
621  lib.JOBSTATUS[i] = 'SUBTD'
622  lib.JOBID[i] = match.group(1)
623  # need standard format for job number
624  if fire_htcondor: lib.JOBID[i] += ".0"
625  print("jobid is", lib.JOBID[i])
626  else:
627  print('Submission of merge job seems to have failed:',result, end=' ')
628 
629  i +=1
630 
631  # end of while on merge jobs
632 
633 
634 lib.write_db()
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
mps_fire.int
int
Definition: mps_fire.py:177
digitizers_cfi.strip
strip
Definition: digitizers_cfi.py:19
mps_fire.write_HTCondor_submit_file_mille
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
Definition: mps_fire.py:125
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
mps_fire.forward_proxy
def forward_proxy(rundir)
Definition: mps_fire.py:31
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
str
#define str(s)
Definition: TestProcessor.cc:53
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
edm::decode
bool decode(bool &, std::string const &)
Definition: types.cc:72
mps_fire.write_HTCondor_submit_file_pede
def write_HTCondor_submit_file_pede(path, script, config, lib)
Definition: mps_fire.py:46
genParticles_cff.map
map
Definition: genParticles_cff.py:11
beamvalidation.exit
def exit(msg="")
Definition: beamvalidation.py:52