3 from __future__
import print_function
4 from builtins
import range
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
13 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
16 parser = argparse.ArgumentParser(description =
"Setup local mps database")
17 parser.add_argument(
"-m",
"--setup-merge", dest =
"setup_merge",
18 action =
"store_true", default =
False,
19 help =
"setup pede merge job")
20 parser.add_argument(
"-a",
"--append", action =
"store_true", default =
False,
21 help =
"append jobs to existing list")
22 parser.add_argument(
"-M",
"--memory", type = int,
23 help =
"memory (MB) to be allocated for pede")
24 parser.add_argument(
"-N",
"--name",
25 help = (
"name to be assigned to the jobs; Whitespaces and " 26 "colons are not allowed"))
27 parser.add_argument(
"-w",
"--weight", type = float,
28 help =
"assign statistical weight")
29 parser.add_argument(
"-e",
"--max-events", dest =
"max_events", type = int,
30 help =
"maximum number of events to process")
32 parser.add_argument(
"batch_script",
33 help =
"path to the mille batch script template")
34 parser.add_argument(
"config_template",
35 help =
"path to the config template")
36 parser.add_argument(
"input_file_list",
37 help =
"path to the input file list")
38 parser.add_argument(
"n_jobs", type = int,
39 help =
"number of jobs assigned to this dataset")
40 parser.add_argument(
"job_class",
41 help=(
"can be any of the normal LSF queues (8nm, 1nh, 8nh, " 42 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, " 43 "cmscaf1nd, cmscaf1nw) and special CAF pede queues " 44 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it " 45 "contains a ':' the part before ':' defines the class for " 46 "mille jobs and the part after defines the pede job class"))
47 parser.add_argument(
"job_name",
48 help =
"name assigned to batch jobs")
49 parser.add_argument(
"merge_script",
50 help =
"path to the pede batch script template")
51 parser.add_argument(
"mss_dir",
52 help =
"name of the mass storage directory")
54 args = parser.parse_args(sys.argv[1:])
58 lib = mpslib.jobdatabase()
59 lib.batchScript = args.batch_script
60 lib.cfgTemplate = args.config_template
61 lib.infiList = args.input_file_list
62 lib.nJobs = args.n_jobs
63 lib.classInf = args.job_class
64 lib.addFiles = args.job_name
65 lib.driver =
"merge" if args.setup_merge
else "" 66 lib.mergeScript = args.merge_script
68 lib.mssDir = args.mss_dir
69 lib.pedeMem = args.memory
72 if not os.access(args.batch_script, os.R_OK):
73 print(
"Bad 'batch_script' script name", args.batch_script)
76 if not os.access(args.config_template, os.R_OK):
77 print(
"Bad 'config_template' file name", args.config_template)
80 if not os.access(args.input_file_list, os.R_OK):
81 print(
"Bad input list file", args.input_file_list)
85 if not os.access(
"mps.db", os.R_OK): args.append =
False 87 allowed_mille_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
88 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
89 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
90 "cmsexpress",
"htcondor_espresso",
"htcondor_microcentury",
91 "htcondor_longlunch",
"htcondor_workday",
"htcondor_tomorrow",
92 "htcondor_testmatch",
"htcondor_nextweek")
93 if lib.get_class(
"mille")
not in allowed_mille_classes:
94 print(
"Bad job class for mille in class", args.job_class)
95 print(
"Allowed classes:")
96 for mille_class
in allowed_mille_classes:
97 print(
" -", mille_class)
100 allowed_pede_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
101 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
102 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
103 "htcondor_bigmem_espresso",
104 "htcondor_bigmem_microcentury",
105 "htcondor_bigmem_longlunch",
106 "htcondor_bigmem_workday",
107 "htcondor_bigmem_tomorrow",
108 "htcondor_bigmem_testmatch",
109 "htcondor_bigmem_nextweek")
110 if lib.get_class(
"pede")
not in allowed_pede_classes:
111 print(
"Bad job class for pede in class", args.job_class)
112 print(
"Allowed classes:")
113 for pede_class
in allowed_pede_classes:
114 print(
" -", pede_class)
118 if args.merge_script ==
"":
119 args.merge_script = args.batch_script +
"merge" 120 if not os.access(args.merge_script, os.R_OK):
121 print(
"Bad merge script file name", args.merge_script)
124 if args.mss_dir.strip() !=
"":
125 if ":" in args.mss_dir:
126 lib.mssDirPool = args.mss_dir.split(
":")
127 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0],
":".
join(lib.mssDirPool[1:])
128 lib.mssDir = args.mss_dir
135 cms_process = mps_tools.get_process_object(args.config_template)
136 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
137 pedeMemDef = os.path.basename(pedeMemDef)
138 pedeMemDef = pedeMemDef.split(
"_")[-1]
139 pedeMemDef = pedeMemDef.replace(
"GB",
"")
141 pedeMemDef = 1024*
float(pedeMemDef)
142 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
144 pedeMemDef =
int(1024*2.5)
150 if not args.memory
or args.memory < pedeMemMin:
151 print(
"Memory request ({}) is < {}, using {}.".
format(args.memory, pedeMemMin, pedeMemDef), end=
' ')
152 lib.pedeMem = args.memory = pedeMemDef
156 if args.append
and os.path.isdir(
"jobData"):
158 jobs = os.listdir(
"jobData")
159 job_regex = re.compile(
r"job([0-9]{3})")
160 existing_jobs = [job_regex.search(item)
for item
in jobs]
161 existing_jobs = [
int(job.group(1))
for job
in existing_jobs
if job
is not None]
162 nJobExist = sorted(existing_jobs)[-1]
164 if nJobExist == 0
or nJobExist <=0
or nJobExist > 999:
166 mps_tools.remove_existing_object(
"jobData")
167 os.makedirs(
"jobData")
170 for j
in range(1, args.n_jobs + 1):
173 print(
"jobdir", jobdir)
174 os.makedirs(os.path.join(
"jobData", jobdir))
177 theJobData = os.path.abspath(
"jobData")
178 print(
"theJobData =", theJobData)
182 tmpBatchScript = lib.batchScript
183 tmpCfgTemplate = lib.cfgTemplate
184 tmpInfiList = lib.infiList
186 tmpClass = lib.classInf
187 tmpMergeScript = lib.mergeScript
188 tmpDriver = lib.driver
194 if lib.JOBDIR[lib.nJobs] ==
"jobm":
210 lib.batchScript = tmpBatchScript
211 lib.cfgTemplate = tmpCfgTemplate
212 lib.infiList = tmpInfiList
214 lib.classInf = tmpClass
215 lib.mergeScript = tmpMergeScript
216 lib.driver = tmpDriver
220 for j
in range(1, args.n_jobs + 1):
222 jobdir =
"job{0:03d}".
format(i)
223 lib.JOBDIR.append(jobdir)
225 lib.JOBSTATUS.append(
"SETUP")
226 lib.JOBNTRY.append(0)
227 lib.JOBRUNTIME.append(0)
228 lib.JOBNEVT.append(0)
229 lib.JOBHOST.append(
"")
230 lib.JOBINCR.append(0)
231 lib.JOBREMARK.append(
"")
232 lib.JOBSP1.append(
"")
233 if args.weight
is not None:
234 lib.JOBSP2.append(
str(args.weight))
236 lib.JOBSP2.append(
"")
237 lib.JOBSP3.append(args.name)
240 cmd = [
"mps_split.pl", args.input_file_list,
241 str(j
if args.max_events
is None else 1),
242 str(args.n_jobs
if args.max_events
is None else 1)]
244 with open(
"jobData/{}/theSplit".
format(jobdir),
"w")
as f:
246 subprocess.check_call(cmd, stdout = f)
247 except subprocess.CalledProcessError:
248 print(
" split failed")
249 lib.JOBSTATUS[i-1] =
"FAIL" 253 cmd = [
"mps_splice.py", args.config_template,
254 "jobData/{}/theSplit".
format(jobdir),
255 "jobData/{}/the.py".
format(jobdir), theIsn]
256 if args.max_events
is not None:
257 chunk_size =
int(args.max_events/args.n_jobs)
258 event_options = [
"--skip-events",
str(chunk_size*(j-1))]
259 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
262 event_options.extend([
"--max-events",
str(max_events)])
263 cmd.extend(event_options)
265 mps_tools.run_checked(cmd)
268 print(
"mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".
format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
269 mps_tools.run_checked([
"mps_script.pl", args.batch_script,
270 "jobData/{}/theScript.sh".
format(jobdir),
271 os.path.join(theJobData, jobdir),
"the.py",
272 "jobData/{}/theSplit".
format(jobdir), theIsn,
273 args.mss_dir, lib.mssDirPool])
278 lib.JOBDIR.append(jobdir)
280 lib.JOBSTATUS.append(
"SETUP")
281 lib.JOBNTRY.append(0)
282 lib.JOBRUNTIME.append(0)
283 lib.JOBNEVT.append(0)
284 lib.JOBHOST.append(
"")
285 lib.JOBINCR.append(0)
286 lib.JOBREMARK.append(
"")
287 lib.JOBSP1.append(
"")
288 lib.JOBSP2.append(
"")
289 lib.JOBSP3.append(
"")
295 shutil.rmtree(
"jobData/jobm", ignore_errors =
True)
296 os.makedirs(
"jobData/jobm")
297 print(
"Create dir jobData/jobm")
300 nJobsMerge = args.n_jobs+nJobExist
303 print(
"mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".
format(args.config_template, theJobData, nJobsMerge))
304 mps_tools.run_checked([
"mps_merge.py",
"-w", args.config_template,
305 "jobData/jobm/alignment_merge.py",
306 os.path.join(theJobData,
"jobm"),
str(nJobsMerge)])
309 print(
"mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".
format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
310 mps_tools.run_checked([
"mps_scriptm.pl", args.merge_script,
311 "jobData/jobm/theScript.sh",
312 os.path.join(theJobData,
"jobm"),
313 "alignment_merge.py",
str(nJobsMerge), args.mss_dir,
319 backups = os.listdir(
"jobData")
320 bu_regex = re.compile(
r"ScriptsAndCfg([0-9]{3})\.tar")
321 existing_backups = [bu_regex.search(item)
for item
in backups]
322 existing_backups = [
int(bu.group(1))
for bu
in existing_backups
if bu
is not None]
323 i = (0
if len(existing_backups) == 0
else sorted(existing_backups)[-1]) + 1
324 ScriptCfg =
"ScriptsAndCfg{0:03d}".
format(i)
325 ScriptCfg = os.path.join(
"jobData", ScriptCfg)
326 os.makedirs(ScriptCfg)
327 for f
in (args.batch_script, args.config_template, args.input_file_list):
328 shutil.copy2(f, ScriptCfg)
330 shutil.copy2(args.merge_script, ScriptCfg)
332 with tarfile.open(ScriptCfg+
".tar",
"w")
as tar: tar.add(ScriptCfg)
333 shutil.rmtree(ScriptCfg)
S & print(S &os, JobReport::InputFile const &f)
static std::string join(char **cmd)