CMS 3D CMS Logo

mps_setup.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 import os
5 import re
6 import sys
7 import shutil
8 import tarfile
9 import argparse
10 import subprocess
11 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
13 
14 
15 parser = argparse.ArgumentParser(description = "Setup local mps database")
16 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
17  action = "store_true", default = False,
18  help = "setup pede merge job")
19 parser.add_argument("-a", "--append", action = "store_true", default = False,
20  help = "append jobs to existing list")
21 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
22  help = "memory (MB) to be allocated for pede")
23 parser.add_argument("-N", "--name", # remove restrictions on job name?
24  help = ("name to be assigned to the jobs; Whitespaces and "
25  "colons are not allowed"))
26 parser.add_argument("-w", "--weight", type = float,
27  help = "assign statistical weight")
28 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
29  help = "maximum number of events to process")
30 
31 parser.add_argument("batch_script",
32  help = "path to the mille batch script template")
33 parser.add_argument("config_template",
34  help = "path to the config template")
35 parser.add_argument("input_file_list",
36  help = "path to the input file list")
37 parser.add_argument("n_jobs", type = int,
38  help = "number of jobs assigned to this dataset")
39 parser.add_argument("job_class",
40  help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
41  "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
42  "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
43  "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
44  "contains a ':' the part before ':' defines the class for "
45  "mille jobs and the part after defines the pede job class"))
46 parser.add_argument("job_name",
47  help = "name assigned to batch jobs")
48 parser.add_argument("merge_script",
49  help = "path to the pede batch script template")
50 parser.add_argument("mss_dir",
51  help = "name of the mass storage directory")
52 
53 args = parser.parse_args(sys.argv[1:])
54 
55 
56 # setup mps database
57 lib = mpslib.jobdatabase()
58 lib.batchScript = args.batch_script
59 lib.cfgTemplate = args.config_template
60 lib.infiList = args.input_file_list
61 lib.nJobs = args.n_jobs
62 lib.classInf = args.job_class
63 lib.addFiles = args.job_name
64 lib.driver = "merge" if args.setup_merge else ""
65 lib.mergeScript = args.merge_script
66 lib.mssDirPool = ""
67 lib.mssDir = args.mss_dir
68 lib.pedeMem = args.memory
69 
70 
71 if not os.access(args.batch_script, os.R_OK):
72  print("Bad 'batch_script' script name", args.batch_script)
73  sys.exit(1)
74 
75 if not os.access(args.config_template, os.R_OK):
76  print("Bad 'config_template' file name", args.config_template)
77  sys.exit(1)
78 
79 if not os.access(args.input_file_list, os.R_OK):
80  print("Bad input list file", args.input_file_list)
81  sys.exit(1)
82 
83 # ignore 'append' flag if mps database is not yet created
84 if not os.access("mps.db", os.R_OK): args.append = False
85 
86 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
87  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
88  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
89  "cmsexpress")
90 if lib.get_class("mille") not in allowed_mille_classes:
91  print("Bad job class for mille in class", args.job_class)
92  print("Allowed classes:")
93  for mille_class in allowed_mille_classes:
94  print(" -", mille_class)
95  sys.exit(1)
96 
97 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
98  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
99  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
100  "htcondor_bigmem_espresso",
101  "htcondor_bigmem_microcentury",
102  "htcondor_bigmem_longlunch",
103  "htcondor_bigmem_workday",
104  "htcondor_bigmem_tomorrow",
105  "htcondor_bigmem_testmatch",
106  "htcondor_bigmem_nextweek")
107 if lib.get_class("pede") not in allowed_pede_classes:
108  print("Bad job class for pede in class", args.job_class)
109  print("Allowed classes:")
110  for pede_class in allowed_pede_classes:
111  print(" -", pede_class)
112  sys.exit(1)
113 
114 if args.setup_merge:
115  if args.merge_script == "":
116  args.merge_script = args.batch_script + "merge"
117  if not os.access(args.merge_script, os.R_OK):
118  print("Bad merge script file name", args.merge_script)
119  sys.exit(1)
120 
121 if args.mss_dir.strip() != "":
122  if ":" in args.mss_dir:
123  lib.mssDirPool = args.mss_dir.split(":")
124  lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
125  lib.mssDir = args.mss_dir
126 
127 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
128 
129 # Try to guess the memory requirements from the pede executable name.
130 # 2.5GB is used as default otherwise.
131 # AP - 23.03.2010
132 cms_process = mps_tools.get_process_object(args.config_template)
133 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
134 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
135 pedeMemDef = pedeMemDef.split("_")[-1]
136 pedeMemDef = pedeMemDef.replace("GB", "")
137 try:
138  pedeMemDef = 1024*float(pedeMemDef)
139  if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
140 except ValueError:
141  pedeMemDef = int(1024*2.5)
142 
143 
144 # Allocate memory for the pede job.
145 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
146 # AP - 23.03.2010
147 if not args.memory or args.memory < pedeMemMin:
148  print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
149  lib.pedeMem = args.memory = pedeMemDef
150 
151 # Create the job directories
152 nJobExist = 0
153 if args.append and os.path.isdir("jobData"):
154  # Append mode, and "jobData" exists
155  jobs = os.listdir("jobData")
156  job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
157  existing_jobs = [job_regex.search(item) for item in jobs]
158  existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
159  nJobExist = sorted(existing_jobs)[-1]
160 
161 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
162  # Delete all
163  mps_tools.remove_existing_object("jobData")
164  os.makedirs("jobData")
165  nJobExist = 0;
166 
167 for j in xrange(1, args.n_jobs + 1):
168  i = j+nJobExist
169  jobdir = "job{0:03d}".format(i)
170  print("jobdir", jobdir)
171  os.makedirs(os.path.join("jobData", jobdir))
172 
173 # build the absolute job directory path (needed by mps_script)
174 theJobData = os.path.abspath("jobData")
175 print("theJobData =", theJobData)
176 
177 if args.append:
178  # save current values
179  tmpBatchScript = lib.batchScript
180  tmpCfgTemplate = lib.cfgTemplate
181  tmpInfiList = lib.infiList
182  tmpNJobs = lib.nJobs
183  tmpClass = lib.classInf
184  tmpMergeScript = lib.mergeScript
185  tmpDriver = lib.driver
186 
187  # Read DB file
188  lib.read_db()
189 
190  # check if last job is a merge job
191  if lib.JOBDIR[lib.nJobs] == "jobm":
192  # remove the merge job
193  lib.JOBDIR.pop()
194  lib.JOBID.pop()
195  lib.JOBSTATUS.pop()
196  lib.JOBNTRY.pop()
197  lib.JOBRUNTIME.pop()
198  lib.JOBNEVT.pop()
199  lib.JOBHOST.pop()
200  lib.JOBINCR.pop()
201  lib.JOBREMARK.pop()
202  lib.JOBSP1.pop()
203  lib.JOBSP2.pop()
204  lib.JOBSP3.pop()
205 
206  # Restore variables
207  lib.batchScript = tmpBatchScript
208  lib.cfgTemplate = tmpCfgTemplate
209  lib.infiList = tmpInfiList
210  lib.nJobs = tmpNJobs
211  lib.classInf = tmpClass
212  lib.mergeScript = tmpMergeScript
213  lib.driver = tmpDriver
214 
215 
216 # Create (update) the local database
217 for j in xrange(1, args.n_jobs + 1):
218  i = j+nJobExist
219  jobdir = "job{0:03d}".format(i)
220  lib.JOBDIR.append(jobdir)
221  lib.JOBID.append("")
222  lib.JOBSTATUS.append("SETUP")
223  lib.JOBNTRY.append(0)
224  lib.JOBRUNTIME.append(0)
225  lib.JOBNEVT.append(0)
226  lib.JOBHOST.append("")
227  lib.JOBINCR.append(0)
228  lib.JOBREMARK.append("")
229  lib.JOBSP1.append("")
230  if args.weight is not None:
231  lib.JOBSP2.append(str(args.weight))
232  else:
233  lib.JOBSP2.append("")
234  lib.JOBSP3.append(args.name)
235 
236  # create the split card files
237  cmd = ["mps_split.pl", args.input_file_list,
238  str(j if args.max_events is None else 1),
239  str(args.n_jobs if args.max_events is None else 1)]
240  print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
241  with open("jobData/{}/theSplit".format(jobdir), "w") as f:
242  try:
243  subprocess.check_call(cmd, stdout = f)
244  except subprocess.CalledProcessError:
245  print(" split failed")
246  lib.JOBSTATUS[i-1] = "FAIL"
247  theIsn = "{0:03d}".format(i)
248 
249  # create the cfg file
250  cmd = ["mps_splice.py", args.config_template,
251  "jobData/{}/theSplit".format(jobdir),
252  "jobData/{}/the.py".format(jobdir), theIsn]
253  if args.max_events is not None:
254  chunk_size = int(args.max_events/args.n_jobs)
255  event_options = ["--skip-events", str(chunk_size*(j-1))]
256  max_events = (args.max_events - (args.n_jobs-1)*chunk_size
257  if j == args.n_jobs # last job gets the remaining events
258  else chunk_size)
259  event_options.extend(["--max-events", str(max_events)])
260  cmd.extend(event_options)
261  print(" ".join(cmd))
262  mps_tools.run_checked(cmd)
263 
264  # create the run script
265  print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
266  mps_tools.run_checked(["mps_script.pl", args.batch_script,
267  "jobData/{}/theScript.sh".format(jobdir),
268  os.path.join(theJobData, jobdir), "the.py",
269  "jobData/{}/theSplit".format(jobdir), theIsn,
270  args.mss_dir, lib.mssDirPool])
271 
272 
273 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
274 jobdir = "jobm";
275 lib.JOBDIR.append(jobdir)
276 lib.JOBID.append("")
277 lib.JOBSTATUS.append("SETUP")
278 lib.JOBNTRY.append(0)
279 lib.JOBRUNTIME.append(0)
280 lib.JOBNEVT.append(0)
281 lib.JOBHOST.append("")
282 lib.JOBINCR.append(0)
283 lib.JOBREMARK.append("")
284 lib.JOBSP1.append("")
285 lib.JOBSP2.append("")
286 lib.JOBSP3.append("")
287 
288 lib.write_db();
289 
290 # if merge mode, create the directory and set up contents
291 if args.setup_merge:
292  shutil.rmtree("jobData/jobm", ignore_errors = True)
293  os.makedirs("jobData/jobm")
294  print("Create dir jobData/jobm")
295 
296  # We want to merge old and new jobs
297  nJobsMerge = args.n_jobs+nJobExist
298 
299  # create merge job cfg
300  print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
301  mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
302  "jobData/jobm/alignment_merge.py",
303  os.path.join(theJobData, "jobm"), str(nJobsMerge)])
304 
305  # create merge job script
306  print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
307  mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
308  "jobData/jobm/theScript.sh",
309  os.path.join(theJobData, "jobm"),
310  "alignment_merge.py", str(nJobsMerge), args.mss_dir,
311  lib.mssDirPool])
312 
313 
314 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
315 # in jobData
316 backups = os.listdir("jobData")
317 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
318 existing_backups = [bu_regex.search(item) for item in backups]
319 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
320 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
321 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
322 ScriptCfg = os.path.join("jobData", ScriptCfg)
323 os.makedirs(ScriptCfg)
324 for f in (args.batch_script, args.config_template, args.input_file_list):
325  shutil.copy2(f, ScriptCfg)
326 if args.setup_merge:
327  shutil.copy2(args.merge_script, ScriptCfg)
328 
329 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
330 shutil.rmtree(ScriptCfg)
331 
332 
333 # Write to DB
334 lib.write_db();
335 lib.read_db();
336 lib.print_memdb();
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)