CMS 3D CMS Logo

mps_setup.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 import os
5 import re
6 import sys
7 import shutil
8 import tarfile
9 import argparse
10 import subprocess
11 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
13 
14 
15 parser = argparse.ArgumentParser(description = "Setup local mps database")
16 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
17  action = "store_true", default = False,
18  help = "setup pede merge job")
19 parser.add_argument("-a", "--append", action = "store_true", default = False,
20  help = "append jobs to existing list")
21 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
22  help = "memory (MB) to be allocated for pede")
23 parser.add_argument("-N", "--name", # remove restrictions on job name?
24  help = ("name to be assigned to the jobs; Whitespaces and "
25  "colons are not allowed"))
26 parser.add_argument("-w", "--weight", type = float,
27  help = "assign statistical weight")
28 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
29  help = "maximum number of events to process")
30 
31 parser.add_argument("batch_script",
32  help = "path to the mille batch script template")
33 parser.add_argument("config_template",
34  help = "path to the config template")
35 parser.add_argument("input_file_list",
36  help = "path to the input file list")
37 parser.add_argument("n_jobs", type = int,
38  help = "number of jobs assigned to this dataset")
39 parser.add_argument("job_class",
40  help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
41  "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
42  "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
43  "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
44  "contains a ':' the part before ':' defines the class for "
45  "mille jobs and the part after defines the pede job class"))
46 parser.add_argument("job_name",
47  help = "name assigned to batch jobs")
48 parser.add_argument("merge_script",
49  help = "path to the pede batch script template")
50 parser.add_argument("mss_dir",
51  help = "name of the mass storage directory")
52 
53 args = parser.parse_args(sys.argv[1:])
54 
55 
56 # setup mps database
57 lib = mpslib.jobdatabase()
58 lib.batchScript = args.batch_script
59 lib.cfgTemplate = args.config_template
60 lib.infiList = args.input_file_list
61 lib.nJobs = args.n_jobs
62 lib.classInf = args.job_class
63 lib.addFiles = args.job_name
64 lib.driver = "merge" if args.setup_merge else ""
65 lib.mergeScript = args.merge_script
66 lib.mssDirPool = ""
67 lib.mssDir = args.mss_dir
68 lib.pedeMem = args.memory
69 
70 
71 if not os.access(args.batch_script, os.R_OK):
72  print("Bad 'batch_script' script name", args.batch_script)
73  sys.exit(1)
74 
75 if not os.access(args.config_template, os.R_OK):
76  print("Bad 'config_template' file name", args.config_template)
77  sys.exit(1)
78 
79 if not os.access(args.input_file_list, os.R_OK):
80  print("Bad input list file", args.input_file_list)
81  sys.exit(1)
82 
83 # ignore 'append' flag if mps database is not yet created
84 if not os.access("mps.db", os.R_OK): args.append = False
85 
86 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
87  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
88  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
89  "cmsexpress","htcondor_espresso","htcondor_microcentury",
90  "htcondor_longlunch","htcondor_workday","htcondor_tomorrow",
91  "htcondor_testmatch","htcondor_nextweek")
92 if lib.get_class("mille") not in allowed_mille_classes:
93  print("Bad job class for mille in class", args.job_class)
94  print("Allowed classes:")
95  for mille_class in allowed_mille_classes:
96  print(" -", mille_class)
97  sys.exit(1)
98 
99 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
100  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
101  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
102  "htcondor_bigmem_espresso",
103  "htcondor_bigmem_microcentury",
104  "htcondor_bigmem_longlunch",
105  "htcondor_bigmem_workday",
106  "htcondor_bigmem_tomorrow",
107  "htcondor_bigmem_testmatch",
108  "htcondor_bigmem_nextweek")
109 if lib.get_class("pede") not in allowed_pede_classes:
110  print("Bad job class for pede in class", args.job_class)
111  print("Allowed classes:")
112  for pede_class in allowed_pede_classes:
113  print(" -", pede_class)
114  sys.exit(1)
115 
116 if args.setup_merge:
117  if args.merge_script == "":
118  args.merge_script = args.batch_script + "merge"
119  if not os.access(args.merge_script, os.R_OK):
120  print("Bad merge script file name", args.merge_script)
121  sys.exit(1)
122 
123 if args.mss_dir.strip() != "":
124  if ":" in args.mss_dir:
125  lib.mssDirPool = args.mss_dir.split(":")
126  lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
127  lib.mssDir = args.mss_dir
128 
129 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
130 
131 # Try to guess the memory requirements from the pede executable name.
132 # 2.5GB is used as default otherwise.
133 # AP - 23.03.2010
134 cms_process = mps_tools.get_process_object(args.config_template)
135 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
136 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
137 pedeMemDef = pedeMemDef.split("_")[-1]
138 pedeMemDef = pedeMemDef.replace("GB", "")
139 try:
140  pedeMemDef = 1024*float(pedeMemDef)
141  if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
142 except ValueError:
143  pedeMemDef = int(1024*2.5)
144 
145 
146 # Allocate memory for the pede job.
147 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
148 # AP - 23.03.2010
149 if not args.memory or args.memory < pedeMemMin:
150  print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
151  lib.pedeMem = args.memory = pedeMemDef
152 
153 # Create the job directories
154 nJobExist = 0
155 if args.append and os.path.isdir("jobData"):
156  # Append mode, and "jobData" exists
157  jobs = os.listdir("jobData")
158  job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
159  existing_jobs = [job_regex.search(item) for item in jobs]
160  existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
161  nJobExist = sorted(existing_jobs)[-1]
162 
163 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
164  # Delete all
165  mps_tools.remove_existing_object("jobData")
166  os.makedirs("jobData")
167  nJobExist = 0;
168 
169 for j in xrange(1, args.n_jobs + 1):
170  i = j+nJobExist
171  jobdir = "job{0:03d}".format(i)
172  print("jobdir", jobdir)
173  os.makedirs(os.path.join("jobData", jobdir))
174 
175 # build the absolute job directory path (needed by mps_script)
176 theJobData = os.path.abspath("jobData")
177 print("theJobData =", theJobData)
178 
179 if args.append:
180  # save current values
181  tmpBatchScript = lib.batchScript
182  tmpCfgTemplate = lib.cfgTemplate
183  tmpInfiList = lib.infiList
184  tmpNJobs = lib.nJobs
185  tmpClass = lib.classInf
186  tmpMergeScript = lib.mergeScript
187  tmpDriver = lib.driver
188 
189  # Read DB file
190  lib.read_db()
191 
192  # check if last job is a merge job
193  if lib.JOBDIR[lib.nJobs] == "jobm":
194  # remove the merge job
195  lib.JOBDIR.pop()
196  lib.JOBID.pop()
197  lib.JOBSTATUS.pop()
198  lib.JOBNTRY.pop()
199  lib.JOBRUNTIME.pop()
200  lib.JOBNEVT.pop()
201  lib.JOBHOST.pop()
202  lib.JOBINCR.pop()
203  lib.JOBREMARK.pop()
204  lib.JOBSP1.pop()
205  lib.JOBSP2.pop()
206  lib.JOBSP3.pop()
207 
208  # Restore variables
209  lib.batchScript = tmpBatchScript
210  lib.cfgTemplate = tmpCfgTemplate
211  lib.infiList = tmpInfiList
212  lib.nJobs = tmpNJobs
213  lib.classInf = tmpClass
214  lib.mergeScript = tmpMergeScript
215  lib.driver = tmpDriver
216 
217 
218 # Create (update) the local database
219 for j in xrange(1, args.n_jobs + 1):
220  i = j+nJobExist
221  jobdir = "job{0:03d}".format(i)
222  lib.JOBDIR.append(jobdir)
223  lib.JOBID.append("")
224  lib.JOBSTATUS.append("SETUP")
225  lib.JOBNTRY.append(0)
226  lib.JOBRUNTIME.append(0)
227  lib.JOBNEVT.append(0)
228  lib.JOBHOST.append("")
229  lib.JOBINCR.append(0)
230  lib.JOBREMARK.append("")
231  lib.JOBSP1.append("")
232  if args.weight is not None:
233  lib.JOBSP2.append(str(args.weight))
234  else:
235  lib.JOBSP2.append("")
236  lib.JOBSP3.append(args.name)
237 
238  # create the split card files
239  cmd = ["mps_split.pl", args.input_file_list,
240  str(j if args.max_events is None else 1),
241  str(args.n_jobs if args.max_events is None else 1)]
242  print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
243  with open("jobData/{}/theSplit".format(jobdir), "w") as f:
244  try:
245  subprocess.check_call(cmd, stdout = f)
246  except subprocess.CalledProcessError:
247  print(" split failed")
248  lib.JOBSTATUS[i-1] = "FAIL"
249  theIsn = "{0:03d}".format(i)
250 
251  # create the cfg file
252  cmd = ["mps_splice.py", args.config_template,
253  "jobData/{}/theSplit".format(jobdir),
254  "jobData/{}/the.py".format(jobdir), theIsn]
255  if args.max_events is not None:
256  chunk_size = int(args.max_events/args.n_jobs)
257  event_options = ["--skip-events", str(chunk_size*(j-1))]
258  max_events = (args.max_events - (args.n_jobs-1)*chunk_size
259  if j == args.n_jobs # last job gets the remaining events
260  else chunk_size)
261  event_options.extend(["--max-events", str(max_events)])
262  cmd.extend(event_options)
263  print(" ".join(cmd))
264  mps_tools.run_checked(cmd)
265 
266  # create the run script
267  print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
268  mps_tools.run_checked(["mps_script.pl", args.batch_script,
269  "jobData/{}/theScript.sh".format(jobdir),
270  os.path.join(theJobData, jobdir), "the.py",
271  "jobData/{}/theSplit".format(jobdir), theIsn,
272  args.mss_dir, lib.mssDirPool])
273 
274 
275 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
276 jobdir = "jobm";
277 lib.JOBDIR.append(jobdir)
278 lib.JOBID.append("")
279 lib.JOBSTATUS.append("SETUP")
280 lib.JOBNTRY.append(0)
281 lib.JOBRUNTIME.append(0)
282 lib.JOBNEVT.append(0)
283 lib.JOBHOST.append("")
284 lib.JOBINCR.append(0)
285 lib.JOBREMARK.append("")
286 lib.JOBSP1.append("")
287 lib.JOBSP2.append("")
288 lib.JOBSP3.append("")
289 
290 lib.write_db();
291 
292 # if merge mode, create the directory and set up contents
293 if args.setup_merge:
294  shutil.rmtree("jobData/jobm", ignore_errors = True)
295  os.makedirs("jobData/jobm")
296  print("Create dir jobData/jobm")
297 
298  # We want to merge old and new jobs
299  nJobsMerge = args.n_jobs+nJobExist
300 
301  # create merge job cfg
302  print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
303  mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
304  "jobData/jobm/alignment_merge.py",
305  os.path.join(theJobData, "jobm"), str(nJobsMerge)])
306 
307  # create merge job script
308  print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
309  mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
310  "jobData/jobm/theScript.sh",
311  os.path.join(theJobData, "jobm"),
312  "alignment_merge.py", str(nJobsMerge), args.mss_dir,
313  lib.mssDirPool])
314 
315 
316 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
317 # in jobData
318 backups = os.listdir("jobData")
319 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
320 existing_backups = [bu_regex.search(item) for item in backups]
321 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
322 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
323 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
324 ScriptCfg = os.path.join("jobData", ScriptCfg)
325 os.makedirs(ScriptCfg)
326 for f in (args.batch_script, args.config_template, args.input_file_list):
327  shutil.copy2(f, ScriptCfg)
328 if args.setup_merge:
329  shutil.copy2(args.merge_script, ScriptCfg)
330 
331 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
332 shutil.rmtree(ScriptCfg)
333 
334 
335 # Write to DB
336 lib.write_db();
337 lib.read_db();
338 lib.print_memdb();
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)