3 from __future__
import print_function
4 from builtins
import range
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
13 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
16 parser = argparse.ArgumentParser(description =
"Setup local mps database")
17 parser.add_argument(
"-m",
"--setup-merge", dest =
"setup_merge",
18 action =
"store_true", default =
False,
19 help =
"setup pede merge job")
20 parser.add_argument(
"-a",
"--append", action =
"store_true", default =
False,
21 help =
"append jobs to existing list")
22 parser.add_argument(
"-M",
"--memory", type = int,
23 help =
"memory (MB) to be allocated for pede")
24 parser.add_argument(
"-N",
"--name",
25 help = (
"name to be assigned to the jobs; Whitespaces and "
26 "colons are not allowed"))
27 parser.add_argument(
"-w",
"--weight", type = float,
28 help =
"assign statistical weight")
29 parser.add_argument(
"-e",
"--max-events", dest =
"max_events", type = int,
30 help =
"maximum number of events to process")
32 parser.add_argument(
"batch_script",
33 help =
"path to the mille batch script template")
34 parser.add_argument(
"config_template",
35 help =
"path to the config template")
36 parser.add_argument(
"input_file_list",
37 help =
"path to the input file list")
38 parser.add_argument(
"n_jobs", type = int,
39 help =
"number of jobs assigned to this dataset")
40 parser.add_argument(
"job_class",
41 help=(
"can be any of the normal LSF queues (8nm, 1nh, 8nh, "
42 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
43 "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
44 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
45 "contains a ':' the part before ':' defines the class for "
46 "mille jobs and the part after defines the pede job class"))
47 parser.add_argument(
"job_name",
48 help =
"name assigned to batch jobs")
49 parser.add_argument(
"merge_script",
50 help =
"path to the pede batch script template")
51 parser.add_argument(
"mss_dir",
52 help =
"name of the mass storage directory")
54 args = parser.parse_args(sys.argv[1:])
58 lib = mpslib.jobdatabase()
59 lib.batchScript = args.batch_script
60 lib.cfgTemplate = args.config_template
61 lib.infiList = args.input_file_list
62 lib.nJobs = args.n_jobs
63 lib.classInf = args.job_class
64 lib.addFiles = args.job_name
65 lib.driver =
"merge" if args.setup_merge
else ""
66 lib.mergeScript = args.merge_script
68 lib.mssDir = args.mss_dir
69 lib.pedeMem = args.memory
72 if not os.access(args.batch_script, os.R_OK):
73 print(
"Bad 'batch_script' script name", args.batch_script)
76 if not os.access(args.config_template, os.R_OK):
77 print(
"Bad 'config_template' file name", args.config_template)
80 if not os.access(args.input_file_list, os.R_OK):
81 print(
"Bad input list file", args.input_file_list)
85 if not os.access(
"mps.db", os.R_OK): args.append =
False
87 allowed_mille_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
88 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
89 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
90 "cmsexpress",
"htcondor_cafalca_espresso",
"htcondor_espresso",
91 "htcondor_cafalca_microcentury",
"htcondor_microcentury",
92 "htcondor_cafalca_longlunch",
"htcondor_longlunch",
93 "htcondor_cafalca_workday",
"htcondor_workday",
94 "htcondor_cafalca_tomorrow",
"htcondor_tomorrow",
95 "htcondor_cafalca_testmatch",
"htcondor_testmatch",
96 "htcondor_cafalca_nextweek",
"htcondor_nextweek")
97 if lib.get_class(
"mille")
not in allowed_mille_classes:
98 print(
"Bad job class for mille in class", args.job_class)
99 print(
"Allowed classes:")
100 for mille_class
in allowed_mille_classes:
101 print(
" -", mille_class)
104 allowed_pede_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
105 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
106 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
107 "htcondor_bigmem_espresso",
108 "htcondor_bigmem_microcentury",
109 "htcondor_bigmem_longlunch",
110 "htcondor_bigmem_workday",
111 "htcondor_bigmem_tomorrow",
112 "htcondor_bigmem_testmatch",
113 "htcondor_bigmem_nextweek")
114 if lib.get_class(
"pede")
not in allowed_pede_classes:
115 print(
"Bad job class for pede in class", args.job_class)
116 print(
"Allowed classes:")
117 for pede_class
in allowed_pede_classes:
118 print(
" -", pede_class)
122 if args.merge_script ==
"":
123 args.merge_script = args.batch_script +
"merge"
124 if not os.access(args.merge_script, os.R_OK):
125 print(
"Bad merge script file name", args.merge_script)
128 if args.mss_dir.strip() !=
"":
129 if ":" in args.mss_dir:
130 lib.mssDirPool = args.mss_dir.split(
":")
131 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0],
":".
join(lib.mssDirPool[1:])
132 lib.mssDir = args.mss_dir
139 cms_process = mps_tools.get_process_object(args.config_template)
140 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
141 pedeMemDef = os.path.basename(pedeMemDef)
142 pedeMemDef = pedeMemDef.split(
"_")[-1]
143 pedeMemDef = pedeMemDef.replace(
"GB",
"")
145 pedeMemDef = 1024*
float(pedeMemDef)
146 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
148 pedeMemDef =
int(1024*2.5)
154 if not args.memory
or args.memory < pedeMemMin:
155 print(
"Memory request ({}) is < {}, using {}.".
format(args.memory, pedeMemMin, pedeMemDef), end=
' ')
156 lib.pedeMem = args.memory = pedeMemDef
160 if args.append
and os.path.isdir(
"jobData"):
162 jobs = os.listdir(
"jobData")
163 job_regex = re.compile(
r"job([0-9]{3})")
164 existing_jobs = [job_regex.search(item)
for item
in jobs]
165 existing_jobs = [
int(job.group(1))
for job
in existing_jobs
if job
is not None]
166 nJobExist = sorted(existing_jobs)[-1]
168 if nJobExist == 0
or nJobExist <=0
or nJobExist > 999:
170 mps_tools.remove_existing_object(
"jobData")
171 os.makedirs(
"jobData")
174 for j
in range(1, args.n_jobs + 1):
177 print(
"jobdir", jobdir)
178 os.makedirs(os.path.join(
"jobData", jobdir))
181 theJobData = os.path.abspath(
"jobData")
182 print(
"theJobData =", theJobData)
186 tmpBatchScript = lib.batchScript
187 tmpCfgTemplate = lib.cfgTemplate
188 tmpInfiList = lib.infiList
190 tmpClass = lib.classInf
191 tmpMergeScript = lib.mergeScript
192 tmpDriver = lib.driver
198 if lib.JOBDIR[lib.nJobs] ==
"jobm":
214 lib.batchScript = tmpBatchScript
215 lib.cfgTemplate = tmpCfgTemplate
216 lib.infiList = tmpInfiList
218 lib.classInf = tmpClass
219 lib.mergeScript = tmpMergeScript
220 lib.driver = tmpDriver
224 for j
in range(1, args.n_jobs + 1):
226 jobdir =
"job{0:03d}".
format(i)
227 lib.JOBDIR.append(jobdir)
229 lib.JOBSTATUS.append(
"SETUP")
230 lib.JOBNTRY.append(0)
231 lib.JOBRUNTIME.append(0)
232 lib.JOBNEVT.append(0)
233 lib.JOBHOST.append(
"")
234 lib.JOBINCR.append(0)
235 lib.JOBREMARK.append(
"")
236 lib.JOBSP1.append(
"")
237 if args.weight
is not None:
238 lib.JOBSP2.append(
str(args.weight))
240 lib.JOBSP2.append(
"")
241 lib.JOBSP3.append(args.name)
244 cmd = [
"mps_split.pl", args.input_file_list,
245 str(j
if args.max_events
is None else 1),
246 str(args.n_jobs
if args.max_events
is None else 1)]
248 with open(
"jobData/{}/theSplit".
format(jobdir),
"w")
as f:
250 subprocess.check_call(cmd, stdout = f)
251 except subprocess.CalledProcessError:
252 print(
" split failed")
253 lib.JOBSTATUS[i-1] =
"FAIL"
257 cmd = [
"mps_splice.py", args.config_template,
258 "jobData/{}/theSplit".
format(jobdir),
259 "jobData/{}/the.py".
format(jobdir), theIsn]
260 if args.max_events
is not None:
261 chunk_size =
int(args.max_events/args.n_jobs)
262 event_options = [
"--skip-events",
str(chunk_size*(j-1))]
263 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
266 event_options.extend([
"--max-events",
str(max_events)])
267 cmd.extend(event_options)
269 mps_tools.run_checked(cmd)
272 print(
"mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".
format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
273 mps_tools.run_checked([
"mps_script.pl", args.batch_script,
274 "jobData/{}/theScript.sh".
format(jobdir),
275 os.path.join(theJobData, jobdir),
"the.py",
276 "jobData/{}/theSplit".
format(jobdir), theIsn,
277 args.mss_dir, lib.mssDirPool])
282 lib.JOBDIR.append(jobdir)
284 lib.JOBSTATUS.append(
"SETUP")
285 lib.JOBNTRY.append(0)
286 lib.JOBRUNTIME.append(0)
287 lib.JOBNEVT.append(0)
288 lib.JOBHOST.append(
"")
289 lib.JOBINCR.append(0)
290 lib.JOBREMARK.append(
"")
291 lib.JOBSP1.append(
"")
292 lib.JOBSP2.append(
"")
293 lib.JOBSP3.append(
"")
299 shutil.rmtree(
"jobData/jobm", ignore_errors =
True)
300 os.makedirs(
"jobData/jobm")
301 print(
"Create dir jobData/jobm")
304 nJobsMerge = args.n_jobs+nJobExist
307 print(
"mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".
format(args.config_template, theJobData, nJobsMerge))
308 mps_tools.run_checked([
"mps_merge.py",
"-w", args.config_template,
309 "jobData/jobm/alignment_merge.py",
310 os.path.join(theJobData,
"jobm"),
str(nJobsMerge)])
313 print(
"mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".
format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
314 mps_tools.run_checked([
"mps_scriptm.pl", args.merge_script,
315 "jobData/jobm/theScript.sh",
316 os.path.join(theJobData,
"jobm"),
317 "alignment_merge.py",
str(nJobsMerge), args.mss_dir,
323 backups = os.listdir(
"jobData")
324 bu_regex = re.compile(
r"ScriptsAndCfg([0-9]{3})\.tar")
325 existing_backups = [bu_regex.search(item)
for item
in backups]
326 existing_backups = [
int(bu.group(1))
for bu
in existing_backups
if bu
is not None]
327 i = (0
if len(existing_backups) == 0
else sorted(existing_backups)[-1]) + 1
328 ScriptCfg =
"ScriptsAndCfg{0:03d}".
format(i)
329 ScriptCfg = os.path.join(
"jobData", ScriptCfg)
330 os.makedirs(ScriptCfg)
331 for f
in (args.batch_script, args.config_template, args.input_file_list):
332 shutil.copy2(f, ScriptCfg)
334 shutil.copy2(args.merge_script, ScriptCfg)
336 with tarfile.open(ScriptCfg+
".tar",
"w")
as tar: tar.add(ScriptCfg)
337 shutil.rmtree(ScriptCfg)