10 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
11 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
14 parser = argparse.ArgumentParser(description =
"Setup local mps database")
15 parser.add_argument(
"-m",
"--setup-merge", dest =
"setup_merge",
16 action =
"store_true", default =
False,
17 help =
"setup pede merge job")
18 parser.add_argument(
"-a",
"--append", action =
"store_true", default =
False,
19 help =
"append jobs to existing list")
20 parser.add_argument(
"-M",
"--memory", type = int,
21 help =
"memory (MB) to be allocated for pede")
22 parser.add_argument(
"-N",
"--name",
23 help = (
"name to be assigned to the jobs; Whitespaces and " 24 "colons are not allowed"))
25 parser.add_argument(
"-w",
"--weight", type = float,
26 help =
"assign statistical weight")
27 parser.add_argument(
"-e",
"--max-events", dest =
"max_events", type = int,
28 help =
"maximum number of events to process")
30 parser.add_argument(
"batch_script",
31 help =
"path to the mille batch script template")
32 parser.add_argument(
"config_template",
33 help =
"path to the config template")
34 parser.add_argument(
"input_file_list",
35 help =
"path to the input file list")
36 parser.add_argument(
"n_jobs", type = int,
37 help =
"number of jobs assigned to this dataset")
38 parser.add_argument(
"job_class",
39 help=(
"can be any of the normal LSF queues (8nm, 1nh, 8nh, " 40 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, " 41 "cmscaf1nd, cmscaf1nw) and special CAF pede queues " 42 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it " 43 "contains a ':' the part before ':' defines the class for " 44 "mille jobs and the part after defines the pede job class"))
45 parser.add_argument(
"job_name",
46 help =
"name assigned to batch jobs")
47 parser.add_argument(
"merge_script",
48 help =
"path to the pede batch script template")
49 parser.add_argument(
"mss_dir",
50 help =
"name of the mass storage directory")
52 args = parser.parse_args(sys.argv[1:])
56 lib = mpslib.jobdatabase()
57 lib.batchScript = args.batch_script
58 lib.cfgTemplate = args.config_template
59 lib.infiList = args.input_file_list
60 lib.nJobs = args.n_jobs
61 lib.classInf = args.job_class
62 lib.addFiles = args.job_name
63 lib.driver =
"merge" if args.setup_merge
else "" 64 lib.mergeScript = args.merge_script
66 lib.mssDir = args.mss_dir
67 lib.pedeMem = args.memory
70 if not os.access(args.batch_script, os.R_OK):
71 print "Bad 'batch_script' script name", args.batch_script
74 if not os.access(args.config_template, os.R_OK):
75 print "Bad 'config_template' file name", args.config_template
78 if not os.access(args.input_file_list, os.R_OK):
79 print "Bad input list file", args.input_file_list
83 if not os.access(
"mps.db", os.R_OK): args.append =
False 85 allowed_mille_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
86 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
87 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
89 if lib.get_class(
"mille")
not in allowed_mille_classes:
90 print "Bad job class for mille in class", args.job_class
91 print "Allowed classes:" 92 for mille_class
in allowed_mille_classes:
93 print " -", mille_class
96 allowed_pede_classes = (
"lxplus",
"cmscaf1nh",
"cmscaf1nd",
"cmscaf1nw",
97 "cmscafspec1nh",
"cmscafspec1nd",
"cmscafspec1nw",
98 "8nm",
"1nh",
"8nh",
"1nd",
"2nd",
"1nw",
"2nw",
99 "htcondor_bigmem_espresso",
100 "htcondor_bigmem_microcentury",
101 "htcondor_bigmem_longlunch",
102 "htcondor_bigmem_workday",
103 "htcondor_bigmem_tomorrow",
104 "htcondor_bigmem_testmatch",
105 "htcondor_bigmem_nextweek")
106 if lib.get_class(
"pede")
not in allowed_pede_classes:
107 print "Bad job class for pede in class", args.job_class
108 print "Allowed classes:" 109 for pede_class
in allowed_pede_classes:
110 print " -", pede_class
114 if args.merge_script ==
"":
115 args.merge_script = args.batch_script +
"merge" 116 if not os.access(args.merge_script, os.R_OK):
117 print "Bad merge script file name", args.merge_script
120 if args.mss_dir.strip() !=
"":
121 if ":" in args.mss_dir:
122 lib.mssDirPool = args.mss_dir.split(
":")
123 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0],
":".
join(lib.mssDirPool[1:])
124 lib.mssDir = args.mss_dir
131 cms_process = mps_tools.get_process_object(args.config_template)
132 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
133 pedeMemDef = os.path.basename(pedeMemDef)
134 pedeMemDef = pedeMemDef.split(
"_")[-1]
135 pedeMemDef = pedeMemDef.replace(
"GB",
"")
137 pedeMemDef = 1024*
float(pedeMemDef)
138 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
140 pedeMemDef =
int(1024*2.5)
146 if not args.memory
or args.memory < pedeMemMin:
147 print "Memory request ({}) is < {}, using {}.".
format(args.memory, pedeMemMin, pedeMemDef),
148 lib.pedeMem = args.memory = pedeMemDef
152 if args.append
and os.path.isdir(
"jobData"):
154 jobs = os.listdir(
"jobData")
155 job_regex = re.compile(
r"job([0-9]{3})")
156 existing_jobs = [job_regex.search(item)
for item
in jobs]
157 existing_jobs = [
int(job.group(1))
for job
in existing_jobs
if job
is not None]
158 nJobExist = sorted(existing_jobs)[-1]
160 if nJobExist == 0
or nJobExist <=0
or nJobExist > 999:
162 mps_tools.remove_existing_object(
"jobData")
163 os.makedirs(
"jobData")
166 for j
in xrange(1, args.n_jobs + 1):
169 print "jobdir", jobdir
170 os.makedirs(os.path.join(
"jobData", jobdir))
173 theJobData = os.path.abspath(
"jobData")
174 print "theJobData =", theJobData
178 tmpBatchScript = lib.batchScript
179 tmpCfgTemplate = lib.cfgTemplate
180 tmpInfiList = lib.infiList
182 tmpClass = lib.classInf
183 tmpMergeScript = lib.mergeScript
184 tmpDriver = lib.driver
190 if lib.JOBDIR[lib.nJobs] ==
"jobm":
206 lib.batchScript = tmpBatchScript
207 lib.cfgTemplate = tmpCfgTemplate
208 lib.infiList = tmpInfiList
210 lib.classInf = tmpClass
211 lib.mergeScript = tmpMergeScript
212 lib.driver = tmpDriver
216 for j
in xrange(1, args.n_jobs + 1):
218 jobdir =
"job{0:03d}".
format(i)
219 lib.JOBDIR.append(jobdir)
221 lib.JOBSTATUS.append(
"SETUP")
222 lib.JOBNTRY.append(0)
223 lib.JOBRUNTIME.append(0)
224 lib.JOBNEVT.append(0)
225 lib.JOBHOST.append(
"")
226 lib.JOBINCR.append(0)
227 lib.JOBREMARK.append(
"")
228 lib.JOBSP1.append(
"")
229 if args.weight
is not None:
230 lib.JOBSP2.append(
str(args.weight))
232 lib.JOBSP2.append(
"")
233 lib.JOBSP3.append(args.name)
236 cmd = [
"mps_split.pl", args.input_file_list,
237 str(j
if args.max_events
is None else 1),
238 str(args.n_jobs
if args.max_events
is None else 1)]
239 print " ".
join(cmd)+
" > jobData/{}/theSplit".
format(jobdir)
240 with open(
"jobData/{}/theSplit".
format(jobdir),
"w")
as f:
242 subprocess.check_call(cmd, stdout = f)
243 except subprocess.CalledProcessError:
244 print " split failed" 245 lib.JOBSTATUS[i-1] =
"FAIL" 249 cmd = [
"mps_splice.py", args.config_template,
250 "jobData/{}/theSplit".
format(jobdir),
251 "jobData/{}/the.py".
format(jobdir), theIsn]
252 if args.max_events
is not None:
253 chunk_size =
int(args.max_events/args.n_jobs)
254 event_options = [
"--skip-events",
str(chunk_size*(j-1))]
255 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
258 event_options.extend([
"--max-events",
str(max_events)])
259 cmd.extend(event_options)
261 mps_tools.run_checked(cmd)
264 print "mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".
format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool)
265 mps_tools.run_checked([
"mps_script.pl", args.batch_script,
266 "jobData/{}/theScript.sh".
format(jobdir),
267 os.path.join(theJobData, jobdir),
"the.py",
268 "jobData/{}/theSplit".
format(jobdir), theIsn,
269 args.mss_dir, lib.mssDirPool])
274 lib.JOBDIR.append(jobdir)
276 lib.JOBSTATUS.append(
"SETUP")
277 lib.JOBNTRY.append(0)
278 lib.JOBRUNTIME.append(0)
279 lib.JOBNEVT.append(0)
280 lib.JOBHOST.append(
"")
281 lib.JOBINCR.append(0)
282 lib.JOBREMARK.append(
"")
283 lib.JOBSP1.append(
"")
284 lib.JOBSP2.append(
"")
285 lib.JOBSP3.append(
"")
291 shutil.rmtree(
"jobData/jobm", ignore_errors =
True)
292 os.makedirs(
"jobData/jobm")
293 print "Create dir jobData/jobm" 296 nJobsMerge = args.n_jobs+nJobExist
299 print "mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".
format(args.config_template, theJobData, nJobsMerge)
300 mps_tools.run_checked([
"mps_merge.py",
"-w", args.config_template,
301 "jobData/jobm/alignment_merge.py",
302 os.path.join(theJobData,
"jobm"),
str(nJobsMerge)])
305 print "mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".
format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool)
306 mps_tools.run_checked([
"mps_scriptm.pl", args.merge_script,
307 "jobData/jobm/theScript.sh",
308 os.path.join(theJobData,
"jobm"),
309 "alignment_merge.py",
str(nJobsMerge), args.mss_dir,
315 backups = os.listdir(
"jobData")
316 bu_regex = re.compile(
r"ScriptsAndCfg([0-9]{3})\.tar")
317 existing_backups = [bu_regex.search(item)
for item
in backups]
318 existing_backups = [
int(bu.group(1))
for bu
in existing_backups
if bu
is not None]
319 i = (0
if len(existing_backups) == 0
else sorted(existing_backups)[-1]) + 1
320 ScriptCfg =
"ScriptsAndCfg{0:03d}".
format(i)
321 ScriptCfg = os.path.join(
"jobData", ScriptCfg)
322 os.makedirs(ScriptCfg)
323 for f
in (args.batch_script, args.config_template, args.input_file_list):
324 shutil.copy2(f, ScriptCfg)
326 shutil.copy2(args.merge_script, ScriptCfg)
328 with tarfile.open(ScriptCfg+
".tar",
"w")
as tar: tar.add(ScriptCfg)
329 shutil.rmtree(ScriptCfg)
static std::string join(char **cmd)