CMS 3D CMS Logo

mps_setup.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import re
5 import sys
6 import shutil
7 import tarfile
8 import argparse
9 import subprocess
10 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
11 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
12 
13 
14 parser = argparse.ArgumentParser(description = "Setup local mps database")
15 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
16  action = "store_true", default = False,
17  help = "setup pede merge job")
18 parser.add_argument("-a", "--append", action = "store_true", default = False,
19  help = "append jobs to existing list")
20 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
21  help = "memory (MB) to be allocated for pede")
22 parser.add_argument("-N", "--name", # remove restrictions on job name?
23  help = ("name to be assigned to the jobs; Whitespaces and "
24  "colons are not allowed"))
25 parser.add_argument("-w", "--weight", type = float,
26  help = "assign statistical weight")
27 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
28  help = "maximum number of events to process")
29 
30 parser.add_argument("batch_script",
31  help = "path to the mille batch script template")
32 parser.add_argument("config_template",
33  help = "path to the config template")
34 parser.add_argument("input_file_list",
35  help = "path to the input file list")
36 parser.add_argument("n_jobs", type = int,
37  help = "number of jobs assigned to this dataset")
38 parser.add_argument("job_class",
39  help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
40  "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
41  "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
42  "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
43  "contains a ':' the part before ':' defines the class for "
44  "mille jobs and the part after defines the pede job class"))
45 parser.add_argument("job_name",
46  help = "name assigned to batch jobs")
47 parser.add_argument("merge_script",
48  help = "path to the pede batch script template")
49 parser.add_argument("mss_dir",
50  help = "name of the mass storage directory")
51 
52 args = parser.parse_args(sys.argv[1:])
53 
54 
55 # setup mps database
56 lib = mpslib.jobdatabase()
57 lib.batchScript = args.batch_script
58 lib.cfgTemplate = args.config_template
59 lib.infiList = args.input_file_list
60 lib.nJobs = args.n_jobs
61 lib.classInf = args.job_class
62 lib.addFiles = args.job_name
63 lib.driver = "merge" if args.setup_merge else ""
64 lib.mergeScript = args.merge_script
65 lib.mssDirPool = ""
66 lib.mssDir = args.mss_dir
67 lib.pedeMem = args.memory
68 
69 
70 if not os.access(args.batch_script, os.R_OK):
71  print "Bad 'batch_script' script name", args.batch_script
72  sys.exit(1)
73 
74 if not os.access(args.config_template, os.R_OK):
75  print "Bad 'config_template' file name", args.config_template
76  sys.exit(1)
77 
78 if not os.access(args.input_file_list, os.R_OK):
79  print "Bad input list file", args.input_file_list
80  sys.exit(1)
81 
82 # ignore 'append' flag if mps database is not yet created
83 if not os.access("mps.db", os.R_OK): args.append = False
84 
85 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
86  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
87  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
88  "cmsexpress")
89 if lib.get_class("mille") not in allowed_mille_classes:
90  print "Bad job class for mille in class", args.job_class
91  print "Allowed classes:"
92  for mille_class in allowed_mille_classes:
93  print " -", mille_class
94  sys.exit(1)
95 
96 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
97  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
98  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
99  "htcondor_bigmem_espresso",
100  "htcondor_bigmem_microcentury",
101  "htcondor_bigmem_longlunch",
102  "htcondor_bigmem_workday",
103  "htcondor_bigmem_tomorrow",
104  "htcondor_bigmem_testmatch",
105  "htcondor_bigmem_nextweek")
106 if lib.get_class("pede") not in allowed_pede_classes:
107  print "Bad job class for pede in class", args.job_class
108  print "Allowed classes:"
109  for pede_class in allowed_pede_classes:
110  print " -", pede_class
111  sys.exit(1)
112 
113 if args.setup_merge:
114  if args.merge_script == "":
115  args.merge_script = args.batch_script + "merge"
116  if not os.access(args.merge_script, os.R_OK):
117  print "Bad merge script file name", args.merge_script
118  sys.exit(1)
119 
120 if args.mss_dir.strip() != "":
121  if ":" in args.mss_dir:
122  lib.mssDirPool = args.mss_dir.split(":")
123  lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
124  lib.mssDir = args.mss_dir
125 
126 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
127 
128 # Try to guess the memory requirements from the pede executable name.
129 # 2.5GB is used as default otherwise.
130 # AP - 23.03.2010
131 cms_process = mps_tools.get_process_object(args.config_template)
132 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
133 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
134 pedeMemDef = pedeMemDef.split("_")[-1]
135 pedeMemDef = pedeMemDef.replace("GB", "")
136 try:
137  pedeMemDef = 1024*float(pedeMemDef)
138  if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
139 except ValueError:
140  pedeMemDef = int(1024*2.5)
141 
142 
143 # Allocate memory for the pede job.
144 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
145 # AP - 23.03.2010
146 if not args.memory or args.memory < pedeMemMin:
147  print "Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef),
148  lib.pedeMem = args.memory = pedeMemDef
149 
150 # Create the job directories
151 nJobExist = 0
152 if args.append and os.path.isdir("jobData"):
153  # Append mode, and "jobData" exists
154  jobs = os.listdir("jobData")
155  job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
156  existing_jobs = [job_regex.search(item) for item in jobs]
157  existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
158  nJobExist = sorted(existing_jobs)[-1]
159 
160 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
161  # Delete all
162  mps_tools.remove_existing_object("jobData")
163  os.makedirs("jobData")
164  nJobExist = 0;
165 
166 for j in xrange(1, args.n_jobs + 1):
167  i = j+nJobExist
168  jobdir = "job{0:03d}".format(i)
169  print "jobdir", jobdir
170  os.makedirs(os.path.join("jobData", jobdir))
171 
172 # build the absolute job directory path (needed by mps_script)
173 theJobData = os.path.abspath("jobData")
174 print "theJobData =", theJobData
175 
176 if args.append:
177  # save current values
178  tmpBatchScript = lib.batchScript
179  tmpCfgTemplate = lib.cfgTemplate
180  tmpInfiList = lib.infiList
181  tmpNJobs = lib.nJobs
182  tmpClass = lib.classInf
183  tmpMergeScript = lib.mergeScript
184  tmpDriver = lib.driver
185 
186  # Read DB file
187  lib.read_db()
188 
189  # check if last job is a merge job
190  if lib.JOBDIR[lib.nJobs] == "jobm":
191  # remove the merge job
192  lib.JOBDIR.pop()
193  lib.JOBID.pop()
194  lib.JOBSTATUS.pop()
195  lib.JOBNTRY.pop()
196  lib.JOBRUNTIME.pop()
197  lib.JOBNEVT.pop()
198  lib.JOBHOST.pop()
199  lib.JOBINCR.pop()
200  lib.JOBREMARK.pop()
201  lib.JOBSP1.pop()
202  lib.JOBSP2.pop()
203  lib.JOBSP3.pop()
204 
205  # Restore variables
206  lib.batchScript = tmpBatchScript
207  lib.cfgTemplate = tmpCfgTemplate
208  lib.infiList = tmpInfiList
209  lib.nJobs = tmpNJobs
210  lib.classInf = tmpClass
211  lib.mergeScript = tmpMergeScript
212  lib.driver = tmpDriver
213 
214 
215 # Create (update) the local database
216 for j in xrange(1, args.n_jobs + 1):
217  i = j+nJobExist
218  jobdir = "job{0:03d}".format(i)
219  lib.JOBDIR.append(jobdir)
220  lib.JOBID.append("")
221  lib.JOBSTATUS.append("SETUP")
222  lib.JOBNTRY.append(0)
223  lib.JOBRUNTIME.append(0)
224  lib.JOBNEVT.append(0)
225  lib.JOBHOST.append("")
226  lib.JOBINCR.append(0)
227  lib.JOBREMARK.append("")
228  lib.JOBSP1.append("")
229  if args.weight is not None:
230  lib.JOBSP2.append(str(args.weight))
231  else:
232  lib.JOBSP2.append("")
233  lib.JOBSP3.append(args.name)
234 
235  # create the split card files
236  cmd = ["mps_split.pl", args.input_file_list,
237  str(j if args.max_events is None else 1),
238  str(args.n_jobs if args.max_events is None else 1)]
239  print " ".join(cmd)+" > jobData/{}/theSplit".format(jobdir)
240  with open("jobData/{}/theSplit".format(jobdir), "w") as f:
241  try:
242  subprocess.check_call(cmd, stdout = f)
243  except subprocess.CalledProcessError:
244  print " split failed"
245  lib.JOBSTATUS[i-1] = "FAIL"
246  theIsn = "{0:03d}".format(i)
247 
248  # create the cfg file
249  cmd = ["mps_splice.py", args.config_template,
250  "jobData/{}/theSplit".format(jobdir),
251  "jobData/{}/the.py".format(jobdir), theIsn]
252  if args.max_events is not None:
253  chunk_size = int(args.max_events/args.n_jobs)
254  event_options = ["--skip-events", str(chunk_size*(j-1))]
255  max_events = (args.max_events - (args.n_jobs-1)*chunk_size
256  if j == args.n_jobs # last job gets the remaining events
257  else chunk_size)
258  event_options.extend(["--max-events", str(max_events)])
259  cmd.extend(event_options)
260  print " ".join(cmd)
261  mps_tools.run_checked(cmd)
262 
263  # create the run script
264  print "mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool)
265  mps_tools.run_checked(["mps_script.pl", args.batch_script,
266  "jobData/{}/theScript.sh".format(jobdir),
267  os.path.join(theJobData, jobdir), "the.py",
268  "jobData/{}/theSplit".format(jobdir), theIsn,
269  args.mss_dir, lib.mssDirPool])
270 
271 
272 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
273 jobdir = "jobm";
274 lib.JOBDIR.append(jobdir)
275 lib.JOBID.append("")
276 lib.JOBSTATUS.append("SETUP")
277 lib.JOBNTRY.append(0)
278 lib.JOBRUNTIME.append(0)
279 lib.JOBNEVT.append(0)
280 lib.JOBHOST.append("")
281 lib.JOBINCR.append(0)
282 lib.JOBREMARK.append("")
283 lib.JOBSP1.append("")
284 lib.JOBSP2.append("")
285 lib.JOBSP3.append("")
286 
287 lib.write_db();
288 
289 # if merge mode, create the directory and set up contents
290 if args.setup_merge:
291  shutil.rmtree("jobData/jobm", ignore_errors = True)
292  os.makedirs("jobData/jobm")
293  print "Create dir jobData/jobm"
294 
295  # We want to merge old and new jobs
296  nJobsMerge = args.n_jobs+nJobExist
297 
298  # create merge job cfg
299  print "mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge)
300  mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
301  "jobData/jobm/alignment_merge.py",
302  os.path.join(theJobData, "jobm"), str(nJobsMerge)])
303 
304  # create merge job script
305  print "mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool)
306  mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
307  "jobData/jobm/theScript.sh",
308  os.path.join(theJobData, "jobm"),
309  "alignment_merge.py", str(nJobsMerge), args.mss_dir,
310  lib.mssDirPool])
311 
312 
313 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
314 # in jobData
315 backups = os.listdir("jobData")
316 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
317 existing_backups = [bu_regex.search(item) for item in backups]
318 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
319 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
320 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
321 ScriptCfg = os.path.join("jobData", ScriptCfg)
322 os.makedirs(ScriptCfg)
323 for f in (args.batch_script, args.config_template, args.input_file_list):
324  shutil.copy2(f, ScriptCfg)
325 if args.setup_merge:
326  shutil.copy2(args.merge_script, ScriptCfg)
327 
328 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
329 shutil.rmtree(ScriptCfg)
330 
331 
332 # Write to DB
333 lib.write_db();
334 lib.read_db();
335 lib.print_memdb();
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)