15 from __future__
import print_function
16 from builtins
import range
17 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
18 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
29 """Forward proxy to location visible from the batch system. 32 - `rundir`: directory for storing the forwarded proxy 35 if not mps_tools.check_proxy():
36 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
39 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
strip()
40 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
44 """Writes 'job.submit' file in `path`. 47 - `path`: job directory 48 - `script`: script to be executed 50 - `lib`: MPS lib object 53 resources = lib.get_class(
"pede").
split(
"_")[1:]
54 job_flavour = resources[-1]
56 job_submit_template=
"""\ 58 executable = {script:s} 59 output = {jobm:s}/STDOUT 60 error = {jobm:s}/STDOUT 63 transfer_output_files = "" 64 request_memory = {pedeMem:d}M 66 # adapted to space used on eos for binaries: 67 request_disk = {disk:d} 69 # adapted to threads parameter in pede options and number of available cores 70 request_cpus = {cpus:d} 72 +JobFlavour = "{flavour:s}" 74 if "bigmem" in resources:
75 job_submit_template +=
"""\ 77 +AccountingGroup = "group_u_CMS.e_cms_caf_bigmem" 79 # automatically remove the job if the submitter has no permissions to run a BigMemJob 80 periodic_remove = !regexp("group_u_CMS.e_cms_caf_bigmem", AccountingGroup) && BigMemJob =?= True 82 job_submit_template +=
"\nqueue\n" 84 print(
"Determine number of pede threads...")
85 cms_process = mps_tools.get_process_object(os.path.join(Path, mergeCfg))
86 pede_options = cms_process.AlignmentProducer.algoConfig.pedeSteerer.options.value()
88 for option
in pede_options:
89 if "threads" in option:
90 n_threads = option.replace(
"threads",
"").
strip()
91 n_threads =
max(
map(
lambda x:
int(x), n_threads.split()))
93 if n_threads > 16: n_threads = 16
97 print(
"Determine required disk space on remote host...")
100 spco = subprocess.check_output
102 cmd = [
"du",
"--apparent-size"]
103 disk_usage = [
int(item.split()[0])
104 for directory
in (
"binaries",
"monitors",
"tree_files")
107 glob.glob(opj(lib.mssDir, directory,
"*"))).splitlines()]
108 disk_usage = sum(disk_usage)
111 job_submit_file = os.path.join(Path,
"job.submit")
112 with open(job_submit_file,
"w")
as f:
113 f.write(job_submit_template.format(script = os.path.abspath(script),
114 jobm = os.path.abspath(path),
115 pedeMem = lib.pedeMem,
116 disk =
int(disk_usage),
118 flavour = job_flavour))
120 return job_submit_file
123 """Writes 'job.submit' file in `path`. 126 - `path`: job directory 127 - `script`: script to be executed 128 - `lib`: MPS lib object 129 - `proxy_path`: path to proxy (only used in case of requested proxy forward) 132 resources = lib.get_class(
"mille").
split(
"_")[1:]
133 job_flavour = resources[-1]
135 job_submit_template=
"""\ 137 executable = {script:s} 138 output = {jobm:s}/STDOUT 139 error = {jobm:s}/STDOUT 140 log = {jobm:s}/HTCJOB 141 notification = Always 142 transfer_output_files = "" 144 +JobFlavour = "{flavour:s}" 146 if "cafalca" in resources:
147 job_submit_template +=
"""\ 149 +AccountingGroup = "group_u_CMS.CAF.ALCA" 150 # automatically remove the job if the submitter has no permissions to run a CAF Job 151 periodic_remove = !regexp("group_u_CMS.CAF.ALCA", AccountingGroup) && CAFJob =?= True 154 if proxy_path
is not None:
155 job_submit_template +=
"""\ 156 +x509userproxy = "{proxy:s}" 158 job_submit_template +=
"\nqueue\n" 160 job_submit_file = os.path.join(Path,
"job.submit")
161 with open(job_submit_file,
"w")
as f:
162 f.write(job_submit_template.format(script = os.path.abspath(script),
163 jobm = os.path.abspath(path),
164 flavour = job_flavour,
167 return job_submit_file
171 parser = argparse.ArgumentParser(
172 description=
"Submit jobs that are setup in local mps database to batch system.",
174 parser.add_argument(
"maxJobs", type=int, nargs=
'?', default=1,
175 help=
"number of Mille jobs to be submitted (default: %(default)d)")
176 parser.add_argument(
"-j",
"--job-id", dest =
"job_id", nargs =
"*",
177 help = (
"job IDs to be submitted; " 178 "use either 'job<ID>' or directly '<ID>'"))
179 parser.add_argument(
"-a",
"--all", dest=
"allMille", default=
False,
181 help = (
"submit all setup Mille jobs; " 182 "maxJobs and --job-id are ignored"))
183 parser.add_argument(
"-m",
"--merge", dest=
"fireMerge", default=
False,
185 help = (
"submit all setup Pede jobs; " 186 "maxJobs is ignored, but --job-id is respected"))
187 parser.add_argument(
"-f",
"--force-merge", dest=
"forceMerge", default=
False,
189 help=(
"force the submission of the Pede job in case some "+
190 "Mille jobs are not in the OK state"))
191 parser.add_argument(
"--force-merge-manual", dest=
"forceMergeManual", default=
False,
193 help=(
"force the submission of the Pede job in case some "+
194 "Mille jobs are not in the OK state. Unlike --forceMerge "+
195 "this option assumes the user has edited theScript.sh and "+
196 "alignment_merge.py to consistently pick up only the mille "+
197 "output files that exist"))
198 parser.add_argument(
"-p",
"--forward-proxy", dest=
"forwardProxy", default=
False,
200 help=
"forward VOMS proxy to batch system")
201 args = parser.parse_args(sys.argv[1:])
204 lib = mpslib.jobdatabase()
209 args.maxJobs = lib.nJobs
212 if args.job_id
is None:
213 job_mask = lib.JOBDIR
216 for job_id
in args.job_id:
218 if job_id.startswith(
"job"): job_mask.append(job_id)
219 elif job_id.startswith(
"m"): job_mask.append(
"job"+job_id)
222 job_mask.append(lib.JOBDIR[
int(job_id)-1])
226 print(
"ID provided to '-j/--job-id' is out of range:", job_id)
229 if invalid_id
or job_mask[-1]
not in lib.JOBDIR:
230 print(
"ID provided to '-j/--job-id' is invalid:", job_id)
231 print(
"'-j/--job-id' requires the IDs to exist and to be of either", end=
' ')
232 print(
"of the following formats:")
240 theJobData = os.path.join(os.getcwd(),
"jobData")
243 theJobName =
'mpalign' 244 if lib.addFiles !=
'':
245 theJobName = lib.addFiles
247 fire_htcondor =
False 250 if not args.fireMerge:
252 resources = lib.get_class(
'mille')
255 if 'cmscafspec' in resources:
256 print(
'\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n')
257 resources =
'-q cmscafalcamille' 259 elif 'cmscaf' in resources:
261 resources =
'-q'+resources+
' -m g_cmscaf' 262 elif "htcondor" in resources:
265 resources =
'-q '+resources
268 for i
in range(lib.nJobs):
269 if lib.JOBDIR[i]
not in job_mask:
continue 270 if lib.JOBSTATUS[i] ==
'SETUP':
271 if nSub < args.maxJobs:
272 if args.forwardProxy:
278 Path = os.path.join(theJobData,lib.JOBDIR[i])
279 scriptPath = os.path.join(Path,
"theScript.sh")
280 if args.forwardProxy:
284 submission =
"condor_submit -batch-name %s %s"%\
285 (theJobName, job_submit_file)
287 submission =
'bsub -J %s %s %s/%s/theScript.sh' % \
288 (theJobName, resources, theJobData, lib.JOBDIR[i])
291 result = subprocess.check_output(submission,
292 stderr=subprocess.STDOUT,
294 except subprocess.CalledProcessError
as e:
296 print(
' '+result, end=
' ')
297 result = result.strip()
301 match = re.search(
r"1 job\(s\) submitted to cluster (\d+)\.", result)
303 match = re.search(
'Job <(\d+)> is submitted', result)
306 lib.JOBSTATUS[i] =
'SUBTD' 307 lib.JOBID[i] = match.group(1)
308 if fire_htcondor: lib.JOBID[i] +=
".0" 310 print(
'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result), end=
' ')
317 resources = lib.get_class(
'pede')
318 if 'cmscafspec' in resources:
319 resources =
'-q cmscafalcamille' 320 elif "htcondor" in resources:
323 resources =
'-q '+resources
325 if not fire_htcondor:
327 resources = resources+
' -R \"rusage[mem="%s"]\"' %
str(lib.pedeMem)
331 for i
in range(lib.nJobs):
332 if lib.JOBSTATUS[i] !=
'OK':
333 if 'DISABLED' not in lib.JOBSTATUS[i]:
339 while i<len(lib.JOBDIR):
341 if lib.JOBDIR[i]
not in job_mask:
346 if lib.JOBSTATUS[i] !=
'SETUP':
347 print(
'Merge job %d status %s not submitted.' % \
348 (jobNumFrom1, lib.JOBSTATUS[i]))
349 elif not (mergeOK
or args.forceMerge
or args.forceMergeManual):
350 print(
'Merge job',jobNumFrom1,
'not submitted since Mille jobs error/unfinished (Use -m -f to force).')
353 Path = os.path.join(theJobData,lib.JOBDIR[i])
354 backupScriptPath = os.path.join(Path,
"theScript.sh.bak")
355 scriptPath = os.path.join(Path,
"theScript.sh")
361 if not os.path.isfile(backupScriptPath):
362 os.system(
'cp -p '+scriptPath+
' '+backupScriptPath)
365 command =
'cat '+backupScriptPath+
' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\'' 366 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True)
367 mergeCfg = mergeCfg.strip()
373 backupCfgPath = os.path.join(Path, mergeCfg+
".bak")
374 cfgPath = os.path.join(Path, mergeCfg)
375 if not os.path.isfile(backupCfgPath):
376 os.system(
'cp -p '+cfgPath+
' '+backupCfgPath)
379 with open(os.path.join(Path,
".weights.pkl"),
"rb")
as f:
380 weight_conf = cPickle.load(f)
383 mps_tools.run_checked([
"mps_weight.pl",
"-c"])
386 for name,weight
in weight_conf:
387 print(
" ".
join([
"mps_weight.pl",
"-N", name, weight]))
388 mps_tools.run_checked([
"mps_weight.pl",
"-N", name, weight])
391 inCfgPath = theJobData+
'/'+lib.JOBDIR[0]+
'/the.py' 392 command =
'mps_merge.py -w -c '+inCfgPath+
' '+Path+
'/'+mergeCfg+
' '+Path+
' '+
str(lib.nJobs)
396 command =
'mps_scriptm.pl -c '+lib.mergeScript+
' '+scriptPath+
' '+Path+
' '+mergeCfg+
' '+
str(lib.nJobs)+
' '+lib.mssDir+
' '+lib.mssDirPool
401 if os.path.isfile(backupScriptPath):
402 os.system(
'cp -pf '+backupScriptPath+
' '+scriptPath)
405 command =
"cat "+scriptPath+
" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'" 406 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True)
407 command =
'basename '+mergeCfg
408 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True)
409 mergeCfg = mergeCfg.replace(
'\n',
'')
415 backupCfgPath = Path+
'/%s.bak' % mergeCfg
416 cfgPath = Path+
'/%s' % mergeCfg
417 if os.path.isfile(backupCfgPath):
418 os.system(
'cp -pf '+backupCfgPath+
' '+cfgPath)
424 curJobName =
'm'+
str(nMerge)+
'_'+theJobName
427 submission = [
"condor_submit",
428 "-batch-name", curJobName,
431 submission = [
"bsub",
"-J", curJobName, resources, scriptPath]
434 result = subprocess.check_output(submission, stderr=subprocess.STDOUT)
436 except subprocess.CalledProcessError
as e:
439 print(
' '+result, end=
' ')
440 result = result.strip()
444 match = re.search(
r"1 job\(s\) submitted to cluster (\d+)\.", result)
446 match = re.search(
'Job <(\d+)> is submitted', result)
448 lib.JOBSTATUS[i] =
'SUBTD' 449 lib.JOBID[i] = match.group(1)
451 if fire_htcondor: lib.JOBID[i] +=
".0" 452 print(
"jobid is", lib.JOBID[i])
454 print(
'Submission of merge job seems to have failed:',result, end=
' ')
def forward_proxy(rundir)
S & print(S &os, JobReport::InputFile const &f)
static std::string join(char **cmd)
def write_HTCondor_submit_file_mille(path, script, lib, proxy_path=None)
def write_HTCondor_submit_file_pede(path, script, config, lib)