CMS 3D CMS Logo

mps_setup.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 from builtins import range
5 import os
6 import re
7 import sys
8 import shutil
9 import tarfile
10 import argparse
11 import subprocess
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
13 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
14 
15 
16 parser = argparse.ArgumentParser(description = "Setup local mps database")
17 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
18  action = "store_true", default = False,
19  help = "setup pede merge job")
20 parser.add_argument("-a", "--append", action = "store_true", default = False,
21  help = "append jobs to existing list")
22 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
23  help = "memory (MB) to be allocated for pede")
24 parser.add_argument("-N", "--name", # remove restrictions on job name?
25  help = ("name to be assigned to the jobs; Whitespaces and "
26  "colons are not allowed"))
27 parser.add_argument("-w", "--weight", type = float,
28  help = "assign statistical weight")
29 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
30  help = "maximum number of events to process")
31 
32 parser.add_argument("batch_script",
33  help = "path to the mille batch script template")
34 parser.add_argument("config_template",
35  help = "path to the config template")
36 parser.add_argument("input_file_list",
37  help = "path to the input file list")
38 parser.add_argument("n_jobs", type = int,
39  help = "number of jobs assigned to this dataset")
40 parser.add_argument("job_class",
41  help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
42  "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
43  "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
44  "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
45  "contains a ':' the part before ':' defines the class for "
46  "mille jobs and the part after defines the pede job class"))
47 parser.add_argument("job_name",
48  help = "name assigned to batch jobs")
49 parser.add_argument("merge_script",
50  help = "path to the pede batch script template")
51 parser.add_argument("mss_dir",
52  help = "name of the mass storage directory")
53 
54 args = parser.parse_args(sys.argv[1:])
55 
56 
57 # setup mps database
58 lib = mpslib.jobdatabase()
59 lib.batchScript = args.batch_script
60 lib.cfgTemplate = args.config_template
61 lib.infiList = args.input_file_list
62 lib.nJobs = args.n_jobs
63 lib.classInf = args.job_class
64 lib.addFiles = args.job_name
65 lib.driver = "merge" if args.setup_merge else ""
66 lib.mergeScript = args.merge_script
67 lib.mssDirPool = ""
68 lib.mssDir = args.mss_dir
69 lib.pedeMem = args.memory
70 
71 
72 if not os.access(args.batch_script, os.R_OK):
73  print("Bad 'batch_script' script name", args.batch_script)
74  sys.exit(1)
75 
76 if not os.access(args.config_template, os.R_OK):
77  print("Bad 'config_template' file name", args.config_template)
78  sys.exit(1)
79 
80 if not os.access(args.input_file_list, os.R_OK):
81  print("Bad input list file", args.input_file_list)
82  sys.exit(1)
83 
84 # ignore 'append' flag if mps database is not yet created
85 if not os.access("mps.db", os.R_OK): args.append = False
86 
87 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
88  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
89  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
90  "cmsexpress","htcondor_espresso","htcondor_microcentury",
91  "htcondor_longlunch","htcondor_workday","htcondor_tomorrow",
92  "htcondor_testmatch","htcondor_nextweek")
93 if lib.get_class("mille") not in allowed_mille_classes:
94  print("Bad job class for mille in class", args.job_class)
95  print("Allowed classes:")
96  for mille_class in allowed_mille_classes:
97  print(" -", mille_class)
98  sys.exit(1)
99 
100 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
101  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
102  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
103  "htcondor_bigmem_espresso",
104  "htcondor_bigmem_microcentury",
105  "htcondor_bigmem_longlunch",
106  "htcondor_bigmem_workday",
107  "htcondor_bigmem_tomorrow",
108  "htcondor_bigmem_testmatch",
109  "htcondor_bigmem_nextweek")
110 if lib.get_class("pede") not in allowed_pede_classes:
111  print("Bad job class for pede in class", args.job_class)
112  print("Allowed classes:")
113  for pede_class in allowed_pede_classes:
114  print(" -", pede_class)
115  sys.exit(1)
116 
117 if args.setup_merge:
118  if args.merge_script == "":
119  args.merge_script = args.batch_script + "merge"
120  if not os.access(args.merge_script, os.R_OK):
121  print("Bad merge script file name", args.merge_script)
122  sys.exit(1)
123 
124 if args.mss_dir.strip() != "":
125  if ":" in args.mss_dir:
126  lib.mssDirPool = args.mss_dir.split(":")
127  lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
128  lib.mssDir = args.mss_dir
129 
130 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
131 
132 # Try to guess the memory requirements from the pede executable name.
133 # 2.5GB is used as default otherwise.
134 # AP - 23.03.2010
135 cms_process = mps_tools.get_process_object(args.config_template)
136 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
137 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
138 pedeMemDef = pedeMemDef.split("_")[-1]
139 pedeMemDef = pedeMemDef.replace("GB", "")
140 try:
141  pedeMemDef = 1024*float(pedeMemDef)
142  if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
143 except ValueError:
144  pedeMemDef = int(1024*2.5)
145 
146 
147 # Allocate memory for the pede job.
148 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
149 # AP - 23.03.2010
150 if not args.memory or args.memory < pedeMemMin:
151  print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
152  lib.pedeMem = args.memory = pedeMemDef
153 
154 # Create the job directories
155 nJobExist = 0
156 if args.append and os.path.isdir("jobData"):
157  # Append mode, and "jobData" exists
158  jobs = os.listdir("jobData")
159  job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
160  existing_jobs = [job_regex.search(item) for item in jobs]
161  existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
162  nJobExist = sorted(existing_jobs)[-1]
163 
164 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
165  # Delete all
166  mps_tools.remove_existing_object("jobData")
167  os.makedirs("jobData")
168  nJobExist = 0;
169 
170 for j in range(1, args.n_jobs + 1):
171  i = j+nJobExist
172  jobdir = "job{0:03d}".format(i)
173  print("jobdir", jobdir)
174  os.makedirs(os.path.join("jobData", jobdir))
175 
176 # build the absolute job directory path (needed by mps_script)
177 theJobData = os.path.abspath("jobData")
178 print("theJobData =", theJobData)
179 
180 if args.append:
181  # save current values
182  tmpBatchScript = lib.batchScript
183  tmpCfgTemplate = lib.cfgTemplate
184  tmpInfiList = lib.infiList
185  tmpNJobs = lib.nJobs
186  tmpClass = lib.classInf
187  tmpMergeScript = lib.mergeScript
188  tmpDriver = lib.driver
189 
190  # Read DB file
191  lib.read_db()
192 
193  # check if last job is a merge job
194  if lib.JOBDIR[lib.nJobs] == "jobm":
195  # remove the merge job
196  lib.JOBDIR.pop()
197  lib.JOBID.pop()
198  lib.JOBSTATUS.pop()
199  lib.JOBNTRY.pop()
200  lib.JOBRUNTIME.pop()
201  lib.JOBNEVT.pop()
202  lib.JOBHOST.pop()
203  lib.JOBINCR.pop()
204  lib.JOBREMARK.pop()
205  lib.JOBSP1.pop()
206  lib.JOBSP2.pop()
207  lib.JOBSP3.pop()
208 
209  # Restore variables
210  lib.batchScript = tmpBatchScript
211  lib.cfgTemplate = tmpCfgTemplate
212  lib.infiList = tmpInfiList
213  lib.nJobs = tmpNJobs
214  lib.classInf = tmpClass
215  lib.mergeScript = tmpMergeScript
216  lib.driver = tmpDriver
217 
218 
219 # Create (update) the local database
220 for j in range(1, args.n_jobs + 1):
221  i = j+nJobExist
222  jobdir = "job{0:03d}".format(i)
223  lib.JOBDIR.append(jobdir)
224  lib.JOBID.append("")
225  lib.JOBSTATUS.append("SETUP")
226  lib.JOBNTRY.append(0)
227  lib.JOBRUNTIME.append(0)
228  lib.JOBNEVT.append(0)
229  lib.JOBHOST.append("")
230  lib.JOBINCR.append(0)
231  lib.JOBREMARK.append("")
232  lib.JOBSP1.append("")
233  if args.weight is not None:
234  lib.JOBSP2.append(str(args.weight))
235  else:
236  lib.JOBSP2.append("")
237  lib.JOBSP3.append(args.name)
238 
239  # create the split card files
240  cmd = ["mps_split.pl", args.input_file_list,
241  str(j if args.max_events is None else 1),
242  str(args.n_jobs if args.max_events is None else 1)]
243  print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
244  with open("jobData/{}/theSplit".format(jobdir), "w") as f:
245  try:
246  subprocess.check_call(cmd, stdout = f)
247  except subprocess.CalledProcessError:
248  print(" split failed")
249  lib.JOBSTATUS[i-1] = "FAIL"
250  theIsn = "{0:03d}".format(i)
251 
252  # create the cfg file
253  cmd = ["mps_splice.py", args.config_template,
254  "jobData/{}/theSplit".format(jobdir),
255  "jobData/{}/the.py".format(jobdir), theIsn]
256  if args.max_events is not None:
257  chunk_size = int(args.max_events/args.n_jobs)
258  event_options = ["--skip-events", str(chunk_size*(j-1))]
259  max_events = (args.max_events - (args.n_jobs-1)*chunk_size
260  if j == args.n_jobs # last job gets the remaining events
261  else chunk_size)
262  event_options.extend(["--max-events", str(max_events)])
263  cmd.extend(event_options)
264  print(" ".join(cmd))
265  mps_tools.run_checked(cmd)
266 
267  # create the run script
268  print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
269  mps_tools.run_checked(["mps_script.pl", args.batch_script,
270  "jobData/{}/theScript.sh".format(jobdir),
271  os.path.join(theJobData, jobdir), "the.py",
272  "jobData/{}/theSplit".format(jobdir), theIsn,
273  args.mss_dir, lib.mssDirPool])
274 
275 
276 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
277 jobdir = "jobm";
278 lib.JOBDIR.append(jobdir)
279 lib.JOBID.append("")
280 lib.JOBSTATUS.append("SETUP")
281 lib.JOBNTRY.append(0)
282 lib.JOBRUNTIME.append(0)
283 lib.JOBNEVT.append(0)
284 lib.JOBHOST.append("")
285 lib.JOBINCR.append(0)
286 lib.JOBREMARK.append("")
287 lib.JOBSP1.append("")
288 lib.JOBSP2.append("")
289 lib.JOBSP3.append("")
290 
291 lib.write_db();
292 
293 # if merge mode, create the directory and set up contents
294 if args.setup_merge:
295  shutil.rmtree("jobData/jobm", ignore_errors = True)
296  os.makedirs("jobData/jobm")
297  print("Create dir jobData/jobm")
298 
299  # We want to merge old and new jobs
300  nJobsMerge = args.n_jobs+nJobExist
301 
302  # create merge job cfg
303  print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
304  mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
305  "jobData/jobm/alignment_merge.py",
306  os.path.join(theJobData, "jobm"), str(nJobsMerge)])
307 
308  # create merge job script
309  print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
310  mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
311  "jobData/jobm/theScript.sh",
312  os.path.join(theJobData, "jobm"),
313  "alignment_merge.py", str(nJobsMerge), args.mss_dir,
314  lib.mssDirPool])
315 
316 
317 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
318 # in jobData
319 backups = os.listdir("jobData")
320 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
321 existing_backups = [bu_regex.search(item) for item in backups]
322 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
323 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
324 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
325 ScriptCfg = os.path.join("jobData", ScriptCfg)
326 os.makedirs(ScriptCfg)
327 for f in (args.batch_script, args.config_template, args.input_file_list):
328  shutil.copy2(f, ScriptCfg)
329 if args.setup_merge:
330  shutil.copy2(args.merge_script, ScriptCfg)
331 
332 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
333 shutil.rmtree(ScriptCfg)
334 
335 
336 # Write to DB
337 lib.write_db();
338 lib.read_db();
339 lib.print_memdb();
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)