15 from __future__
import print_function
16 from builtins
import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
23 if sys.version_info[0]>2:
24 import _pickle
as cPickle
32 """Forward proxy to location visible from the batch system. 35 - `rundir`: directory for storing the forwarded proxy 38 if not mps_tools.check_proxy():
39 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
42 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
decode().
strip()
43 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
47 """Writes 'job.submit' file in `path`. 50 - `path`: job directory 51 - `script`: script to be executed 53 - `lib`: MPS lib object 56 resources = lib.get_class(
"pede").
split(
"_")[1:]
57 job_flavour = resources[-1]
59 job_submit_template=
"""\ 61 executable = {script:s} 62 output = {jobm:s}/STDOUT 63 error = {jobm:s}/STDOUT 66 transfer_output_files = "" 67 request_memory = {pedeMem:d}M 69 # adapted to space used on eos for binaries: 70 request_disk = {disk:d} 72 # adapted to threads parameter in pede options and number of available cores 73 request_cpus = {cpus:d} 75 +JobFlavour = "{flavour:s}" 77 if "bigmem" in resources:
78 job_submit_template +=
"""\ 80 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem" 82 # automatically remove the job if the submitter has no permissions to run a BigMemJob 83 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True 85 job_submit_template +=
"\nqueue\n" 87 print(
"Determine number of pede threads...")
88 cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
89 pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
91 for option
in pede_options:
92 if "threads" in option:
93 n_threads = option.replace(
"threads",
"").
strip()
94 n_threads =
max(
map(
lambda x:
int(x), n_threads.split()))
96 if n_threads > 16: n_threads = 16
100 print(
"Determine required disk space on remote host...")
103 spco = subprocess.check_output
105 cmd = [
"du",
"--apparent-size"]
106 disk_usage = [
int(item.split()[0])
107 for directory
in (
"binaries",
"monitors",
"tree_files")
110 glob.glob(opj(lib.mssDir, directory,
"*"))).
decode().splitlines()]
111 disk_usage = sum(disk_usage)
114 job_submit_file = os.path.join(Path,
"job.submit")
115 with open(job_submit_file,
"w")
as f:
116 f.write(job_submit_template.format(script = os.path.abspath(script),
117 jobm = os.path.abspath(path),
118 pedeMem = lib.pedeMem,
119 disk =
int(disk_usage),
121 flavour = job_flavour))
123 return job_submit_file
126 """Writes 'job.submit' file in `path`. 129 - `path`: job directory 130 - `script`: script to be executed 131 - `lib`: MPS lib object 132 - `proxy_path`: path to proxy (only used in case of requested proxy forward) 135 resources = lib.get_class(
"mille").
split(
"_")[1:]
136 job_flavour = resources[-1]
138 job_submit_template=
"""\ 140 executable = {script:s} 141 output = {jobm:s}/STDOUT 142 error = {jobm:s}/STDOUT 143 log = {jobm:s}/HTCJOB 144 notification = Always 145 transfer_output_files = "" 147 +JobFlavour = "{flavour:s}" 149 if "cafalca" in resources:
150 job_submit_template +=
"""\ 152 +AccountingGroup = "group_u_CMS.CAF.ALCA" 153 # automatically remove the job if the submitter has no permissions to run a CAF Job 154 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True 157 if proxy_path
is not None:
158 job_submit_template +=
"""\ 159 +x509userproxy = "{proxy:s}" 161 job_submit_template +=
"\nqueue\n" 163 job_submit_file = os.path.join(Path,
"job.submit")
164 with open(job_submit_file,
"w")
as f:
165 f.write(job_submit_template.format(script = os.path.abspath(script),
166 jobm = os.path.abspath(path),
167 flavour = job_flavour,
170 return job_submit_file
174 parser = argparse.ArgumentParser(
175 description=
"Submit jobs that are setup in local mps database to batch system.",
177 parser.add_argument(
"maxJobs", type=int, nargs=
'?', default=1,
178 help=
"number of Mille jobs to be submitted (default: %(default)d)")
179 parser.add_argument(
"-j",
"--job-id", dest =
"job_id", nargs =
"*",
180 help = (
"job IDs to be submitted; " 181 "use either 'job<ID>' or directly '<ID>'"))
182 parser.add_argument(
"-r",
"--resubmit", dest =
"resub", default=
False,
184 help = (
"resubmit jobs - only works if job IDs are specified"))
185 parser.add_argument(
"-a",
"--all", dest=
"allMille", default=
False,
187 help = (
"submit all setup Mille jobs; " 188 "maxJobs and --job-id are ignored"))
189 parser.add_argument(
"-m",
"--merge", dest=
"fireMerge", default=
False,
191 help = (
"submit all setup Pede jobs; " 192 "maxJobs is ignored, but --job-id is respected"))
193 parser.add_argument(
"-f",
"--force-merge", dest=
"forceMerge", default=
False,
195 help=(
"force the submission of the Pede job in case some "+
196 "Mille jobs are not in the OK state"))
197 parser.add_argument(
"--force-merge-manual", dest=
"forceMergeManual", default=
False,
199 help=(
"force the submission of the Pede job in case some "+
200 "Mille jobs are not in the OK state. Unlike --forceMerge "+
201 "this option assumes the user has edited theScript.sh and "+
202 "alignment_merge.py to consistently pick up only the mille "+
203 "output files that exist"))
204 parser.add_argument(
"-p",
"--forward-proxy", dest=
"forwardProxy", default=
False,
206 help=
"forward VOMS proxy to batch system")
207 args = parser.parse_args(sys.argv[1:])
210 lib = mpslib.jobdatabase()
215 args.maxJobs = lib.nJobs
218 if args.job_id
is None and args.resub:
219 print(
"Can only resubmit jobs if job IDs are specified")
223 if args.job_id
is None:
224 job_mask = lib.JOBDIR
227 for job_id
in args.job_id:
229 if job_id.startswith(
"job"): job_mask.append(job_id)
230 elif job_id.startswith(
"m"): job_mask.append(
"job"+job_id)
233 job_mask.append(lib.JOBDIR[
int(job_id)-1])
237 print(
"ID provided to '-j/--job-id' is out of range:", job_id)
240 if invalid_id
or job_mask[-1]
not in lib.JOBDIR:
241 print(
"ID provided to '-j/--job-id' is invalid:", job_id)
242 print(
"'-j/--job-id' requires the IDs to exist and to be of either", end=
' ')
243 print(
"of the following formats:")
251 theJobData = os.path.join(os.getcwd(),
"jobData")
254 theJobName =
'mpalign' 255 if lib.addFiles !=
'':
256 theJobName = lib.addFiles
258 fire_htcondor =
False 261 if not args.fireMerge:
263 resources = lib.get_class(
'mille')
266 if 'cmscafspec' in resources:
267 print(
'\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
268 resources =
'-q cmscafalcamille' 270 elif 'cmscaf' in resources:
272 resources =
'-q'+resources+
' -m g_cmscaf' 273 elif "htcondor" in resources:
275 schedinfo = subprocess.check_output([
"myschedd",
"show"]).
decode()
276 if 'cafalca' in resources:
277 if not 'tzero' in schedinfo:
278 print(
"\nMPS fire: request to use CAF pool which has not been set up. Call `module load lxbatch/tzero` and try again")
281 if not 'share' in schedinfo:
282 print(
"\nMPS fire: request to use standard pool when CAF pool is set up. Call `module load lxbatch/share` and try again")
285 resources =
'-q '+resources
288 for i
in range(lib.nJobs):
289 if lib.JOBDIR[i]
not in job_mask:
continue 290 if lib.JOBSTATUS[i] ==
'SETUP':
291 if nSub < args.maxJobs:
292 if args.forwardProxy:
298 Path = os.path.join(theJobData,lib.JOBDIR[i])
299 scriptPath = os.path.join(Path,
"theScript.sh")
300 if args.forwardProxy:
304 submission =
"condor_submit -batch-name %s %s"%\
305 (theJobName, job_submit_file)
307 submission =
'bsub -J %s %s %s/%s/theScript.sh' % \
308 (theJobName, resources, theJobData, lib.JOBDIR[i])
311 result = subprocess.check_output(submission,
312 stderr=subprocess.STDOUT,
314 except subprocess.CalledProcessError
as e:
318 result = result.strip()
322 match = re.search(
r"1 job\(s\) submitted to cluster (\d+)\.", result)
324 match = re.search(
'Job <(\d+)> is submitted', result)
327 lib.JOBSTATUS[i] =
'SUBTD' 328 lib.JOBID[i] = match.group(1)
329 if fire_htcondor: lib.JOBID[i] +=
".0" 331 print(
'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=
' ')
335 if nSub < args.maxJobs:
336 if args.forwardProxy:
340 os.remove(
"%s/%s/HTCJOB" % (theJobData, lib.JOBDIR[i]))
342 print(
"Cannot delete file %s/%s/HTCJOB :" % (theJobData,lib.JOBDIR[i]), e.strerror)
344 os.remove(
"%s/%s/STDOUT" % (theJobData, lib.JOBDIR[i]))
346 print(
"Cannot delete file %s/%s/STDOUT :" % (theJobData,lib.JOBDIR[i]), e.strerror)
348 os.remove(
"%s/%s/STDOUT.gz" % (theJobData, lib.JOBDIR[i]))
350 print(
"Cannot delete file %s/%s/STDOUT.gz :" % (theJobData,lib.JOBDIR[i]), e.strerror)
352 os.remove(
"%s/%s/alignment.log.gz" % (theJobData, lib.JOBDIR[i]))
354 print(
"Cannot delete file %s/%s/alignment.log.gz :" % (theJobData,lib.JOBDIR[i]), e.strerror)
356 os.remove(
"%s/%s/millePedeMonitor%03d.root" % (theJobData, lib.JOBDIR[i], lib.JOBNUMBER[i]))
358 print(
"Cannot delete file %s/%s/millePedeMonitor%03d.root :" % (theJobData,lib.JOBDIR[i],lib.JOBNUMBER[i]), e.strerror)
363 Path = os.path.join(theJobData,lib.JOBDIR[i])
364 scriptPath = os.path.join(Path,
"theScript.sh")
365 if args.forwardProxy:
369 submission =
"condor_submit -batch-name %s %s"%\
370 (theJobName, job_submit_file)
372 submission =
'bsub -J %s %s %s/%s/theScript.sh' % \
373 (theJobName, resources, theJobData, lib.JOBDIR[i])
376 result = subprocess.check_output(submission,
377 stderr=subprocess.STDOUT,
379 except subprocess.CalledProcessError
as e:
381 print(
' '+result, end=
' ')
382 result = result.strip()
386 match = re.search(
r"1 job\(s\) submitted to cluster (\d+)\.", result)
388 match = re.search(
'Job <(\d+)> is submitted', result)
391 lib.JOBSTATUS[i] =
'SUBTD' 392 lib.JOBID[i] = match.group(1)
393 if fire_htcondor: lib.JOBID[i] +=
".0" 395 print(
'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=
' ')
403 resources = lib.get_class(
'pede')
404 if 'cmscafspec' in resources:
405 resources =
'-q cmscafalcamille' 406 elif "htcondor" in resources:
408 schedinfo = subprocess.check_output([
"myschedd",
"show"]).
decode()
409 if 'bigmem' in resources:
410 if not 'share' in schedinfo:
411 print(
"\nMPS fire: CAF pool is set up, but request to use high-memory machines which live in the standard pool. Call `module load lxbatch/share` and try again")
414 resources =
'-q '+resources
416 if not fire_htcondor:
418 resources = resources+
' -R \"rusage[mem="%s"]\"' %
str(lib.pedeMem)
422 for i
in range(lib.nJobs):
423 if lib.JOBSTATUS[i] !=
'OK':
424 if 'DISABLED' not in lib.JOBSTATUS[i]:
430 while i<len(lib.JOBDIR):
432 if lib.JOBDIR[i]
not in job_mask:
437 if lib.JOBSTATUS[i] !=
'SETUP' and not args.resub:
438 print(
'Merge job %d status %s not submitted.' % \
439 (jobNumFrom1, lib.JOBSTATUS[i]))
440 elif not (mergeOK
or args.forceMerge
or args.forceMergeManual):
441 print(
'Merge job',jobNumFrom1,
'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
444 Path = os.path.join(theJobData,lib.JOBDIR[i])
445 backupScriptPath = os.path.join(Path,
"theScript.sh.bak")
446 scriptPath = os.path.join(Path,
"theScript.sh")
452 if not os.path.isfile(backupScriptPath):
453 os.system(
'cp -p '+scriptPath+
' '+backupScriptPath)
456 command =
'cat '+backupScriptPath+
' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\'' 457 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True).
decode()
458 mergeCfg = mergeCfg.strip()
464 backupCfgPath = os.path.join(Path, mergeCfg+
".bak")
465 cfgPath = os.path.join(Path, mergeCfg)
466 if not os.path.isfile(backupCfgPath):
467 os.system(
'cp -p '+cfgPath+
' '+backupCfgPath)
470 with open(os.path.join(Path,
".weights.pkl"),
"rb")
as f:
471 weight_conf = cPickle.load(f)
474 mps_tools.run_checked([
"mps_weight.pl",
"-c"])
477 for name,weight
in weight_conf:
478 print(
" ".
join([
"mps_weight.pl",
"-N", name, weight]))
479 mps_tools.run_checked([
"mps_weight.pl",
"-N", name, weight])
482 inCfgPath = theJobData+
'/'+lib.JOBDIR[0]+
'/the.py' 483 command =
'mps_merge.py -w -c '+inCfgPath+
' '+Path+
'/'+mergeCfg+
' '+Path+
' '+
str(lib.nJobs)
487 command =
'mps_scriptm.pl -c '+lib.mergeScript+
' '+scriptPath+
' '+Path+
' '+mergeCfg+
' '+
str(lib.nJobs)+
' '+lib.mssDir+
' '+lib.mssDirPool
492 if os.path.isfile(backupScriptPath):
493 os.system(
'cp -pf '+backupScriptPath+
' '+scriptPath)
496 command =
"cat "+scriptPath+
" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'" 497 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True).
decode()
498 command =
'basename '+mergeCfg
499 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True).
decode()
500 mergeCfg = mergeCfg.replace(
'\n',
'')
506 backupCfgPath = Path+
'/%s.bak' % mergeCfg
507 cfgPath = Path+
'/%s' % mergeCfg
508 if os.path.isfile(backupCfgPath):
509 os.system(
'cp -pf '+backupCfgPath+
' '+cfgPath)
515 curJobName =
'm'+
str(nMerge)+
'_'+theJobName
518 submission = [
"condor_submit",
519 "-batch-name", curJobName,
522 submission = [
"bsub",
"-J", curJobName, resources, scriptPath]
525 result = subprocess.check_output(submission, stderr=subprocess.STDOUT).
decode()
527 except subprocess.CalledProcessError
as e:
530 print(
' '+result, end=
' ')
531 result = result.strip()
535 match = re.search(
r"1 job\(s\) submitted to cluster (\d+)\.", result)
537 match = re.search(
'Job <(\d+)> is submitted', result)
539 lib.JOBSTATUS[i] =
'SUBTD' 540 lib.JOBID[i] = match.group(1)
542 if fire_htcondor: lib.JOBID[i] +=
".0" 543 print(
"jobid is", lib.JOBID[i])
545 print(
'Submission of merge job seems to have failed:',result, end=
' ')
549 Path = os.path.join(theJobData,lib.JOBDIR[i])
550 dircontents = os.listdir(Path)
551 for outfile
in dircontents:
552 if outfile.endswith(
".root"):
553 os.remove(
"%s/%s" %(Path, outfile))
555 os.remove(
"%s/HTCJOB" % (Path))
557 print(
"Cannot delete file %s/HTCJOB :" % (Path), e.strerror)
559 os.remove(
"%s/STDOUT" % (Path))
561 print(
"Cannot delete file %s/STDOUT :" % (Path), e.strerror)
563 os.remove(
"%s/STDOUT.gz" % (Path))
565 print(
"Cannot delete file %s/STDOUT.gz :" % (Path), e.strerror)
567 os.remove(
"%s/alignment.log.gz" % (Path))
569 print(
"Cannot delete file %s/alignment.log.gz :" % (Path), e.strerror)
572 backupScriptPath = os.path.join(Path,
"theScript.sh.bak")
573 scriptPath = os.path.join(Path,
"theScript.sh")
576 if os.path.isfile(backupScriptPath):
577 os.system(
'cp -pf '+backupScriptPath+
' '+scriptPath)
580 command =
"cat "+scriptPath+
" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'" 581 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True).
decode()
582 command =
'basename '+mergeCfg
583 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True).
decode()
584 mergeCfg = mergeCfg.replace(
'\n',
'')
590 backupCfgPath = Path+
'/%s.bak' % mergeCfg
591 cfgPath = Path+
'/%s' % mergeCfg
592 if os.path.isfile(backupCfgPath):
593 os.system(
'cp -pf '+backupCfgPath+
' '+cfgPath)
598 curJobName =
'm'+
str(nMerge)+
'_'+theJobName
601 submission = [
"condor_submit",
602 "-batch-name", curJobName,
605 submission = [
"bsub",
"-J", curJobName, resources, scriptPath]
608 result = subprocess.check_output(submission, stderr=subprocess.STDOUT).
decode()
610 except subprocess.CalledProcessError
as e:
613 print(
' '+result, end=
' ')
614 result = result.strip()
618 match = re.search(
r"1 job\(s\) submitted to cluster (\d+)\.", result)
620 match = re.search(
'Job <(\d+)> is submitted', result)
622 lib.JOBSTATUS[i] =
'SUBTD' 623 lib.JOBID[i] = match.group(1)
625 if fire_htcondor: lib.JOBID[i] +=
".0" 626 print(
"jobid is", lib.JOBID[i])
628 print(
'Submission of merge job seems to have failed:',result, end=
' ')
def forward_proxy(rundir)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
bool decode(bool &, std::string const &)
def split(sequence, size)
static std::string join(char **cmd)
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
def write_HTCondor_submit_file_pede(path, script, config, lib)