3 from __future__
import print_function
11 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
15 parser = argparse.ArgumentParser(description =
"Setup local mps database")
16 parser.add_argument(
"-m",
"--setup-merge", dest =
"setup_merge",
17 action =
"store_true", default =
False,
18 help =
"setup pede merge job")
19 parser.add_argument(
"-a",
"--append", action =
"store_true", default =
False,
20 help =
"append jobs to existing list")
21 parser.add_argument(
"-M",
"--memory", type = int,
22 help =
"memory (MB) to be allocated for pede")
23 parser.add_argument(
"-N",
"--name",
24 help = (
"name to be assigned to the jobs; Whitespaces and " 25 "colons are not allowed"))
26 parser.add_argument(
"-w",
"--weight", type = float,
27 help =
"assign statistical weight")
28 parser.add_argument(
"-e",
"--max-events", dest =
"max_events", type = int,
29 help =
"maximum number of events to process")
31 parser.add_argument(
"batch_script",
32 help =
"path to the mille batch script template")
33 parser.add_argument(
"config_template",
34 help =
"path to the config template")
35 parser.add_argument(
"input_file_list",
36 help =
"path to the input file list")
37 parser.add_argument(
"n_jobs", type = int,
38 help =
"number of jobs assigned to this dataset")
39 parser.add_argument(
"job_class",
40 help=(
"can be any of the normal LSF queues (8nm, 1nh, 8nh, " 41 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, " 42 "cmscaf1nd, cmscaf1nw) and special CAF pede queues " 43 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it " 44 "contains a ':' the part before ':' defines the class for " 45 "mille jobs and the part after defines the pede job class"))
46 parser.add_argument(
"job_name",
47 help =
"name assigned to batch jobs")
48 parser.add_argument(
"merge_script",
49 help =
"path to the pede batch script template")
50 parser.add_argument(
"mss_dir",
51 help =
"name of the mass storage directory")
53 args = parser.parse_args(sys.argv[1:])
57 lib = mpslib.jobdatabase()
58 lib.batchScript = args.batch_script
59 lib.cfgTemplate = args.config_template
60 lib.infiList = args.input_file_list
61 lib.nJobs = args.n_jobs
62 lib.classInf = args.job_class
63 lib.addFiles = args.job_name
64 lib.driver =
"merge" if args.setup_merge
else "" 65 lib.mergeScript = args.merge_script
67 lib.mssDir = args.mss_dir
68 lib.pedeMem = args.memory
71 if not os.access(args.batch_script, os.R_OK):
72 print(
"Bad 'batch_script' script name", args.batch_script)
75 if not os.access(args.config_template, os.R_OK):
76 print(
"Bad 'config_template' file name", args.config_template)
79 if not os.access(args.input_file_list, os.R_OK):
80 print(
"Bad input list file", args.input_file_list)
84 if not os.access(
"mps.db", os.R_OK): args.append =
False 86 allowed_mille_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
87 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
88 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
89 "cmsexpress",
"htcondor_espresso",
"htcondor_microcentury",
90 "htcondor_longlunch",
"htcondor_workday",
"htcondor_tomorrow",
91 "htcondor_testmatch",
"htcondor_nextweek")
92 if lib.get_class(
"mille")
not in allowed_mille_classes:
93 print(
"Bad job class for mille in class", args.job_class)
94 print(
"Allowed classes:")
95 for mille_class
in allowed_mille_classes:
96 print(
" -", mille_class)
99 allowed_pede_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
100 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
101 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
102 "htcondor_bigmem_espresso",
103 "htcondor_bigmem_microcentury",
104 "htcondor_bigmem_longlunch",
105 "htcondor_bigmem_workday",
106 "htcondor_bigmem_tomorrow",
107 "htcondor_bigmem_testmatch",
108 "htcondor_bigmem_nextweek")
109 if lib.get_class(
"pede")
not in allowed_pede_classes:
110 print(
"Bad job class for pede in class", args.job_class)
111 print(
"Allowed classes:")
112 for pede_class
in allowed_pede_classes:
113 print(
" -", pede_class)
117 if args.merge_script ==
"":
118 args.merge_script = args.batch_script +
"merge" 119 if not os.access(args.merge_script, os.R_OK):
120 print(
"Bad merge script file name", args.merge_script)
123 if args.mss_dir.strip() !=
"":
124 if ":" in args.mss_dir:
125 lib.mssDirPool = args.mss_dir.split(
":")
126 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0],
":".
join(lib.mssDirPool[1:])
127 lib.mssDir = args.mss_dir
134 cms_process = mps_tools.get_process_object(args.config_template)
135 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
136 pedeMemDef = os.path.basename(pedeMemDef)
137 pedeMemDef = pedeMemDef.split(
"_")[-1]
138 pedeMemDef = pedeMemDef.replace(
"GB",
"")
140 pedeMemDef = 1024*
float(pedeMemDef)
141 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
143 pedeMemDef =
int(1024*2.5)
149 if not args.memory
or args.memory < pedeMemMin:
150 print(
"Memory request ({}) is < {}, using {}.".
format(args.memory, pedeMemMin, pedeMemDef), end=
' ')
151 lib.pedeMem = args.memory = pedeMemDef
155 if args.append
and os.path.isdir(
"jobData"):
157 jobs = os.listdir(
"jobData")
158 job_regex = re.compile(
r"job([0-9]{3})")
159 existing_jobs = [job_regex.search(item)
for item
in jobs]
160 existing_jobs = [
int(job.group(1))
for job
in existing_jobs
if job
is not None]
161 nJobExist = sorted(existing_jobs)[-1]
163 if nJobExist == 0
or nJobExist <=0
or nJobExist > 999:
165 mps_tools.remove_existing_object(
"jobData")
166 os.makedirs(
"jobData")
169 for j
in xrange(1, args.n_jobs + 1):
172 print(
"jobdir", jobdir)
173 os.makedirs(os.path.join(
"jobData", jobdir))
176 theJobData = os.path.abspath(
"jobData")
177 print(
"theJobData =", theJobData)
181 tmpBatchScript = lib.batchScript
182 tmpCfgTemplate = lib.cfgTemplate
183 tmpInfiList = lib.infiList
185 tmpClass = lib.classInf
186 tmpMergeScript = lib.mergeScript
187 tmpDriver = lib.driver
193 if lib.JOBDIR[lib.nJobs] ==
"jobm":
209 lib.batchScript = tmpBatchScript
210 lib.cfgTemplate = tmpCfgTemplate
211 lib.infiList = tmpInfiList
213 lib.classInf = tmpClass
214 lib.mergeScript = tmpMergeScript
215 lib.driver = tmpDriver
219 for j
in xrange(1, args.n_jobs + 1):
221 jobdir =
"job{0:03d}".
format(i)
222 lib.JOBDIR.append(jobdir)
224 lib.JOBSTATUS.append(
"SETUP")
225 lib.JOBNTRY.append(0)
226 lib.JOBRUNTIME.append(0)
227 lib.JOBNEVT.append(0)
228 lib.JOBHOST.append(
"")
229 lib.JOBINCR.append(0)
230 lib.JOBREMARK.append(
"")
231 lib.JOBSP1.append(
"")
232 if args.weight
is not None:
233 lib.JOBSP2.append(
str(args.weight))
235 lib.JOBSP2.append(
"")
236 lib.JOBSP3.append(args.name)
239 cmd = [
"mps_split.pl", args.input_file_list,
240 str(j
if args.max_events
is None else 1),
241 str(args.n_jobs
if args.max_events
is None else 1)]
243 with open(
"jobData/{}/theSplit".
format(jobdir),
"w")
as f:
245 subprocess.check_call(cmd, stdout = f)
246 except subprocess.CalledProcessError:
247 print(
" split failed")
248 lib.JOBSTATUS[i-1] =
"FAIL" 252 cmd = [
"mps_splice.py", args.config_template,
253 "jobData/{}/theSplit".
format(jobdir),
254 "jobData/{}/the.py".
format(jobdir), theIsn]
255 if args.max_events
is not None:
256 chunk_size =
int(args.max_events/args.n_jobs)
257 event_options = [
"--skip-events",
str(chunk_size*(j-1))]
258 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
261 event_options.extend([
"--max-events",
str(max_events)])
262 cmd.extend(event_options)
264 mps_tools.run_checked(cmd)
267 print(
"mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".
format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
268 mps_tools.run_checked([
"mps_script.pl", args.batch_script,
269 "jobData/{}/theScript.sh".
format(jobdir),
270 os.path.join(theJobData, jobdir),
"the.py",
271 "jobData/{}/theSplit".
format(jobdir), theIsn,
272 args.mss_dir, lib.mssDirPool])
277 lib.JOBDIR.append(jobdir)
279 lib.JOBSTATUS.append(
"SETUP")
280 lib.JOBNTRY.append(0)
281 lib.JOBRUNTIME.append(0)
282 lib.JOBNEVT.append(0)
283 lib.JOBHOST.append(
"")
284 lib.JOBINCR.append(0)
285 lib.JOBREMARK.append(
"")
286 lib.JOBSP1.append(
"")
287 lib.JOBSP2.append(
"")
288 lib.JOBSP3.append(
"")
294 shutil.rmtree(
"jobData/jobm", ignore_errors =
True)
295 os.makedirs(
"jobData/jobm")
296 print(
"Create dir jobData/jobm")
299 nJobsMerge = args.n_jobs+nJobExist
302 print(
"mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".
format(args.config_template, theJobData, nJobsMerge))
303 mps_tools.run_checked([
"mps_merge.py",
"-w", args.config_template,
304 "jobData/jobm/alignment_merge.py",
305 os.path.join(theJobData,
"jobm"),
str(nJobsMerge)])
308 print(
"mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".
format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
309 mps_tools.run_checked([
"mps_scriptm.pl", args.merge_script,
310 "jobData/jobm/theScript.sh",
311 os.path.join(theJobData,
"jobm"),
312 "alignment_merge.py",
str(nJobsMerge), args.mss_dir,
318 backups = os.listdir(
"jobData")
319 bu_regex = re.compile(
r"ScriptsAndCfg([0-9]{3})\.tar")
320 existing_backups = [bu_regex.search(item)
for item
in backups]
321 existing_backups = [
int(bu.group(1))
for bu
in existing_backups
if bu
is not None]
322 i = (0
if len(existing_backups) == 0
else sorted(existing_backups)[-1]) + 1
323 ScriptCfg =
"ScriptsAndCfg{0:03d}".
format(i)
324 ScriptCfg = os.path.join(
"jobData", ScriptCfg)
325 os.makedirs(ScriptCfg)
326 for f
in (args.batch_script, args.config_template, args.input_file_list):
327 shutil.copy2(f, ScriptCfg)
329 shutil.copy2(args.merge_script, ScriptCfg)
331 with tarfile.open(ScriptCfg+
".tar",
"w")
as tar: tar.add(ScriptCfg)
332 shutil.rmtree(ScriptCfg)
S & print(S &os, JobReport::InputFile const &f)
static std::string join(char **cmd)