3 from __future__
import print_function
11 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
15 parser = argparse.ArgumentParser(description =
"Setup local mps database")
16 parser.add_argument(
"-m",
"--setup-merge", dest =
"setup_merge",
17 action =
"store_true", default =
False,
18 help =
"setup pede merge job")
19 parser.add_argument(
"-a",
"--append", action =
"store_true", default =
False,
20 help =
"append jobs to existing list")
21 parser.add_argument(
"-M",
"--memory", type = int,
22 help =
"memory (MB) to be allocated for pede")
23 parser.add_argument(
"-N",
"--name",
24 help = (
"name to be assigned to the jobs; Whitespaces and " 25 "colons are not allowed"))
26 parser.add_argument(
"-w",
"--weight", type = float,
27 help =
"assign statistical weight")
28 parser.add_argument(
"-e",
"--max-events", dest =
"max_events", type = int,
29 help =
"maximum number of events to process")
31 parser.add_argument(
"batch_script",
32 help =
"path to the mille batch script template")
33 parser.add_argument(
"config_template",
34 help =
"path to the config template")
35 parser.add_argument(
"input_file_list",
36 help =
"path to the input file list")
37 parser.add_argument(
"n_jobs", type = int,
38 help =
"number of jobs assigned to this dataset")
39 parser.add_argument(
"job_class",
40 help=(
"can be any of the normal LSF queues (8nm, 1nh, 8nh, " 41 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, " 42 "cmscaf1nd, cmscaf1nw) and special CAF pede queues " 43 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it " 44 "contains a ':' the part before ':' defines the class for " 45 "mille jobs and the part after defines the pede job class"))
46 parser.add_argument(
"job_name",
47 help =
"name assigned to batch jobs")
48 parser.add_argument(
"merge_script",
49 help =
"path to the pede batch script template")
50 parser.add_argument(
"mss_dir",
51 help =
"name of the mass storage directory")
53 args = parser.parse_args(sys.argv[1:])
57 lib = mpslib.jobdatabase()
58 lib.batchScript = args.batch_script
59 lib.cfgTemplate = args.config_template
60 lib.infiList = args.input_file_list
61 lib.nJobs = args.n_jobs
62 lib.classInf = args.job_class
63 lib.addFiles = args.job_name
64 lib.driver =
"merge" if args.setup_merge
else "" 65 lib.mergeScript = args.merge_script
67 lib.mssDir = args.mss_dir
68 lib.pedeMem = args.memory
71 if not os.access(args.batch_script, os.R_OK):
72 print(
"Bad 'batch_script' script name", args.batch_script)
75 if not os.access(args.config_template, os.R_OK):
76 print(
"Bad 'config_template' file name", args.config_template)
79 if not os.access(args.input_file_list, os.R_OK):
80 print(
"Bad input list file", args.input_file_list)
84 if not os.access(
"mps.db", os.R_OK): args.append =
False 86 allowed_mille_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
87 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
88 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
90 if lib.get_class(
"mille")
not in allowed_mille_classes:
91 print(
"Bad job class for mille in class", args.job_class)
92 print(
"Allowed classes:")
93 for mille_class
in allowed_mille_classes:
94 print(
" -", mille_class)
97 allowed_pede_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
98 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
99 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
100 "htcondor_bigmem_espresso",
101 "htcondor_bigmem_microcentury",
102 "htcondor_bigmem_longlunch",
103 "htcondor_bigmem_workday",
104 "htcondor_bigmem_tomorrow",
105 "htcondor_bigmem_testmatch",
106 "htcondor_bigmem_nextweek")
107 if lib.get_class(
"pede")
not in allowed_pede_classes:
108 print(
"Bad job class for pede in class", args.job_class)
109 print(
"Allowed classes:")
110 for pede_class
in allowed_pede_classes:
111 print(
" -", pede_class)
115 if args.merge_script ==
"":
116 args.merge_script = args.batch_script +
"merge" 117 if not os.access(args.merge_script, os.R_OK):
118 print(
"Bad merge script file name", args.merge_script)
121 if args.mss_dir.strip() !=
"":
122 if ":" in args.mss_dir:
123 lib.mssDirPool = args.mss_dir.split(
":")
124 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0],
":".
join(lib.mssDirPool[1:])
125 lib.mssDir = args.mss_dir
132 cms_process = mps_tools.get_process_object(args.config_template)
133 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
134 pedeMemDef = os.path.basename(pedeMemDef)
135 pedeMemDef = pedeMemDef.split(
"_")[-1]
136 pedeMemDef = pedeMemDef.replace(
"GB",
"")
138 pedeMemDef = 1024*
float(pedeMemDef)
139 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
141 pedeMemDef =
int(1024*2.5)
147 if not args.memory
or args.memory < pedeMemMin:
148 print(
"Memory request ({}) is < {}, using {}.".
format(args.memory, pedeMemMin, pedeMemDef), end=
' ')
149 lib.pedeMem = args.memory = pedeMemDef
153 if args.append
and os.path.isdir(
"jobData"):
155 jobs = os.listdir(
"jobData")
156 job_regex = re.compile(
r"job([0-9]{3})")
157 existing_jobs = [job_regex.search(item)
for item
in jobs]
158 existing_jobs = [
int(job.group(1))
for job
in existing_jobs
if job
is not None]
159 nJobExist = sorted(existing_jobs)[-1]
161 if nJobExist == 0
or nJobExist <=0
or nJobExist > 999:
163 mps_tools.remove_existing_object(
"jobData")
164 os.makedirs(
"jobData")
167 for j
in xrange(1, args.n_jobs + 1):
170 print(
"jobdir", jobdir)
171 os.makedirs(os.path.join(
"jobData", jobdir))
174 theJobData = os.path.abspath(
"jobData")
175 print(
"theJobData =", theJobData)
179 tmpBatchScript = lib.batchScript
180 tmpCfgTemplate = lib.cfgTemplate
181 tmpInfiList = lib.infiList
183 tmpClass = lib.classInf
184 tmpMergeScript = lib.mergeScript
185 tmpDriver = lib.driver
191 if lib.JOBDIR[lib.nJobs] ==
"jobm":
207 lib.batchScript = tmpBatchScript
208 lib.cfgTemplate = tmpCfgTemplate
209 lib.infiList = tmpInfiList
211 lib.classInf = tmpClass
212 lib.mergeScript = tmpMergeScript
213 lib.driver = tmpDriver
217 for j
in xrange(1, args.n_jobs + 1):
219 jobdir =
"job{0:03d}".
format(i)
220 lib.JOBDIR.append(jobdir)
222 lib.JOBSTATUS.append(
"SETUP")
223 lib.JOBNTRY.append(0)
224 lib.JOBRUNTIME.append(0)
225 lib.JOBNEVT.append(0)
226 lib.JOBHOST.append(
"")
227 lib.JOBINCR.append(0)
228 lib.JOBREMARK.append(
"")
229 lib.JOBSP1.append(
"")
230 if args.weight
is not None:
231 lib.JOBSP2.append(
str(args.weight))
233 lib.JOBSP2.append(
"")
234 lib.JOBSP3.append(args.name)
237 cmd = [
"mps_split.pl", args.input_file_list,
238 str(j
if args.max_events
is None else 1),
239 str(args.n_jobs
if args.max_events
is None else 1)]
241 with open(
"jobData/{}/theSplit".
format(jobdir),
"w")
as f:
243 subprocess.check_call(cmd, stdout = f)
244 except subprocess.CalledProcessError:
245 print(
" split failed")
246 lib.JOBSTATUS[i-1] =
"FAIL" 250 cmd = [
"mps_splice.py", args.config_template,
251 "jobData/{}/theSplit".
format(jobdir),
252 "jobData/{}/the.py".
format(jobdir), theIsn]
253 if args.max_events
is not None:
254 chunk_size =
int(args.max_events/args.n_jobs)
255 event_options = [
"--skip-events",
str(chunk_size*(j-1))]
256 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
259 event_options.extend([
"--max-events",
str(max_events)])
260 cmd.extend(event_options)
262 mps_tools.run_checked(cmd)
265 print(
"mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".
format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
266 mps_tools.run_checked([
"mps_script.pl", args.batch_script,
267 "jobData/{}/theScript.sh".
format(jobdir),
268 os.path.join(theJobData, jobdir),
"the.py",
269 "jobData/{}/theSplit".
format(jobdir), theIsn,
270 args.mss_dir, lib.mssDirPool])
275 lib.JOBDIR.append(jobdir)
277 lib.JOBSTATUS.append(
"SETUP")
278 lib.JOBNTRY.append(0)
279 lib.JOBRUNTIME.append(0)
280 lib.JOBNEVT.append(0)
281 lib.JOBHOST.append(
"")
282 lib.JOBINCR.append(0)
283 lib.JOBREMARK.append(
"")
284 lib.JOBSP1.append(
"")
285 lib.JOBSP2.append(
"")
286 lib.JOBSP3.append(
"")
292 shutil.rmtree(
"jobData/jobm", ignore_errors =
True)
293 os.makedirs(
"jobData/jobm")
294 print(
"Create dir jobData/jobm")
297 nJobsMerge = args.n_jobs+nJobExist
300 print(
"mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".
format(args.config_template, theJobData, nJobsMerge))
301 mps_tools.run_checked([
"mps_merge.py",
"-w", args.config_template,
302 "jobData/jobm/alignment_merge.py",
303 os.path.join(theJobData,
"jobm"),
str(nJobsMerge)])
306 print(
"mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".
format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
307 mps_tools.run_checked([
"mps_scriptm.pl", args.merge_script,
308 "jobData/jobm/theScript.sh",
309 os.path.join(theJobData,
"jobm"),
310 "alignment_merge.py",
str(nJobsMerge), args.mss_dir,
316 backups = os.listdir(
"jobData")
317 bu_regex = re.compile(
r"ScriptsAndCfg([0-9]{3})\.tar")
318 existing_backups = [bu_regex.search(item)
for item
in backups]
319 existing_backups = [
int(bu.group(1))
for bu
in existing_backups
if bu
is not None]
320 i = (0
if len(existing_backups) == 0
else sorted(existing_backups)[-1]) + 1
321 ScriptCfg =
"ScriptsAndCfg{0:03d}".
format(i)
322 ScriptCfg = os.path.join(
"jobData", ScriptCfg)
323 os.makedirs(ScriptCfg)
324 for f
in (args.batch_script, args.config_template, args.input_file_list):
325 shutil.copy2(f, ScriptCfg)
327 shutil.copy2(args.merge_script, ScriptCfg)
329 with tarfile.open(ScriptCfg+
".tar",
"w")
as tar: tar.add(ScriptCfg)
330 shutil.rmtree(ScriptCfg)
S & print(S &os, JobReport::InputFile const &f)
static std::string join(char **cmd)