CMS 3D CMS Logo

mps_setup.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 from __future__ import print_function
4 from builtins import range
5 import os
6 import re
7 import sys
8 import shutil
9 import tarfile
10 import argparse
11 import subprocess
12 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
13 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
14 
15 
16 parser = argparse.ArgumentParser(description = "Setup local mps database")
17 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
18  action = "store_true", default = False,
19  help = "setup pede merge job")
20 parser.add_argument("-a", "--append", action = "store_true", default = False,
21  help = "append jobs to existing list")
22 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
23  help = "memory (MB) to be allocated for pede")
24 parser.add_argument("-N", "--name", # remove restrictions on job name?
25  help = ("name to be assigned to the jobs; Whitespaces and "
26  "colons are not allowed"))
27 parser.add_argument("-w", "--weight", type = float,
28  help = "assign statistical weight")
29 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
30  help = "maximum number of events to process")
31 
32 parser.add_argument("batch_script",
33  help = "path to the mille batch script template")
34 parser.add_argument("config_template",
35  help = "path to the config template")
36 parser.add_argument("input_file_list",
37  help = "path to the input file list")
38 parser.add_argument("n_jobs", type = int,
39  help = "number of jobs assigned to this dataset")
40 parser.add_argument("job_class",
41  help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
42  "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
43  "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
44  "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
45  "contains a ':' the part before ':' defines the class for "
46  "mille jobs and the part after defines the pede job class"))
47 parser.add_argument("job_name",
48  help = "name assigned to batch jobs")
49 parser.add_argument("merge_script",
50  help = "path to the pede batch script template")
51 parser.add_argument("mss_dir",
52  help = "name of the mass storage directory")
53 
54 args = parser.parse_args(sys.argv[1:])
55 
56 
57 # setup mps database
58 lib = mpslib.jobdatabase()
59 lib.batchScript = args.batch_script
60 lib.cfgTemplate = args.config_template
61 lib.infiList = args.input_file_list
62 lib.nJobs = args.n_jobs
63 lib.classInf = args.job_class
64 lib.addFiles = args.job_name
65 lib.driver = "merge" if args.setup_merge else ""
66 lib.mergeScript = args.merge_script
67 lib.mssDirPool = ""
68 lib.mssDir = args.mss_dir
69 lib.pedeMem = args.memory
70 
71 
72 if not os.access(args.batch_script, os.R_OK):
73  print("Bad 'batch_script' script name", args.batch_script)
74  sys.exit(1)
75 
76 if not os.access(args.config_template, os.R_OK):
77  print("Bad 'config_template' file name", args.config_template)
78  sys.exit(1)
79 
80 if not os.access(args.input_file_list, os.R_OK):
81  print("Bad input list file", args.input_file_list)
82  sys.exit(1)
83 
84 # ignore 'append' flag if mps database is not yet created
85 if not os.access("mps.db", os.R_OK): args.append = False
86 
87 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
88  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
89  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
90  "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
91  "htcondor_cafalca_microcentury","htcondor_microcentury",
92  "htcondor_cafalca_longlunch", "htcondor_longlunch",
93  "htcondor_cafalca_workday", "htcondor_workday",
94  "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
95  "htcondor_cafalca_testmatch", "htcondor_testmatch",
96  "htcondor_cafalca_nextweek", "htcondor_nextweek")
97 if lib.get_class("mille") not in allowed_mille_classes:
98  print("Bad job class for mille in class", args.job_class)
99  print("Allowed classes:")
100  for mille_class in allowed_mille_classes:
101  print(" -", mille_class)
102  sys.exit(1)
103 
104 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
105  "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
106  "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
107  "htcondor_bigmem_espresso",
108  "htcondor_bigmem_microcentury",
109  "htcondor_bigmem_longlunch",
110  "htcondor_bigmem_workday",
111  "htcondor_bigmem_tomorrow",
112  "htcondor_bigmem_testmatch",
113  "htcondor_bigmem_nextweek")
114 if lib.get_class("pede") not in allowed_pede_classes:
115  print("Bad job class for pede in class", args.job_class)
116  print("Allowed classes:")
117  for pede_class in allowed_pede_classes:
118  print(" -", pede_class)
119  sys.exit(1)
120 
121 if args.setup_merge:
122  if args.merge_script == "":
123  args.merge_script = args.batch_script + "merge"
124  if not os.access(args.merge_script, os.R_OK):
125  print("Bad merge script file name", args.merge_script)
126  sys.exit(1)
127 
128 if args.mss_dir.strip() != "":
129  if ":" in args.mss_dir:
130  lib.mssDirPool = args.mss_dir.split(":")
131  lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
132  lib.mssDir = args.mss_dir
133 
134 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
135 
136 # Try to guess the memory requirements from the pede executable name.
137 # 2.5GB is used as default otherwise.
138 # AP - 23.03.2010
139 cms_process = mps_tools.get_process_object(args.config_template)
140 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
141 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
142 pedeMemDef = pedeMemDef.split("_")[-1]
143 pedeMemDef = pedeMemDef.replace("GB", "")
144 try:
145  pedeMemDef = 1024*float(pedeMemDef)
146  if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
147 except ValueError:
148  pedeMemDef = int(1024*2.5)
149 
150 
151 # Allocate memory for the pede job.
152 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
153 # AP - 23.03.2010
154 if not args.memory or args.memory < pedeMemMin:
155  print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
156  lib.pedeMem = args.memory = pedeMemDef
157 
158 # Create the job directories
159 nJobExist = 0
160 if args.append and os.path.isdir("jobData"):
161  # Append mode, and "jobData" exists
162  jobs = os.listdir("jobData")
163  job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
164  existing_jobs = [job_regex.search(item) for item in jobs]
165  existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
166  nJobExist = sorted(existing_jobs)[-1]
167 
168 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
169  # Delete all
170  mps_tools.remove_existing_object("jobData")
171  os.makedirs("jobData")
172  nJobExist = 0;
173 
174 for j in range(1, args.n_jobs + 1):
175  i = j+nJobExist
176  jobdir = "job{0:03d}".format(i)
177  print("jobdir", jobdir)
178  os.makedirs(os.path.join("jobData", jobdir))
179 
180 # build the absolute job directory path (needed by mps_script)
181 theJobData = os.path.abspath("jobData")
182 print("theJobData =", theJobData)
183 
184 if args.append:
185  # save current values
186  tmpBatchScript = lib.batchScript
187  tmpCfgTemplate = lib.cfgTemplate
188  tmpInfiList = lib.infiList
189  tmpNJobs = lib.nJobs
190  tmpClass = lib.classInf
191  tmpMergeScript = lib.mergeScript
192  tmpDriver = lib.driver
193 
194  # Read DB file
195  lib.read_db()
196 
197  # check if last job is a merge job
198  if lib.JOBDIR[lib.nJobs] == "jobm":
199  # remove the merge job
200  lib.JOBDIR.pop()
201  lib.JOBID.pop()
202  lib.JOBSTATUS.pop()
203  lib.JOBNTRY.pop()
204  lib.JOBRUNTIME.pop()
205  lib.JOBNEVT.pop()
206  lib.JOBHOST.pop()
207  lib.JOBINCR.pop()
208  lib.JOBREMARK.pop()
209  lib.JOBSP1.pop()
210  lib.JOBSP2.pop()
211  lib.JOBSP3.pop()
212 
213  # Restore variables
214  lib.batchScript = tmpBatchScript
215  lib.cfgTemplate = tmpCfgTemplate
216  lib.infiList = tmpInfiList
217  lib.nJobs = tmpNJobs
218  lib.classInf = tmpClass
219  lib.mergeScript = tmpMergeScript
220  lib.driver = tmpDriver
221 
222 
223 # Create (update) the local database
224 for j in range(1, args.n_jobs + 1):
225  i = j+nJobExist
226  jobdir = "job{0:03d}".format(i)
227  lib.JOBDIR.append(jobdir)
228  lib.JOBID.append("")
229  lib.JOBSTATUS.append("SETUP")
230  lib.JOBNTRY.append(0)
231  lib.JOBRUNTIME.append(0)
232  lib.JOBNEVT.append(0)
233  lib.JOBHOST.append("")
234  lib.JOBINCR.append(0)
235  lib.JOBREMARK.append("")
236  lib.JOBSP1.append("")
237  if args.weight is not None:
238  lib.JOBSP2.append(str(args.weight))
239  else:
240  lib.JOBSP2.append("")
241  lib.JOBSP3.append(args.name)
242 
243  # create the split card files
244  cmd = ["mps_split.pl", args.input_file_list,
245  str(j if args.max_events is None else 1),
246  str(args.n_jobs if args.max_events is None else 1)]
247  print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
248  with open("jobData/{}/theSplit".format(jobdir), "w") as f:
249  try:
250  subprocess.check_call(cmd, stdout = f)
251  except subprocess.CalledProcessError:
252  print(" split failed")
253  lib.JOBSTATUS[i-1] = "FAIL"
254  theIsn = "{0:03d}".format(i)
255 
256  # create the cfg file
257  cmd = ["mps_splice.py", args.config_template,
258  "jobData/{}/theSplit".format(jobdir),
259  "jobData/{}/the.py".format(jobdir), theIsn]
260  if args.max_events is not None:
261  chunk_size = int(args.max_events/args.n_jobs)
262  event_options = ["--skip-events", str(chunk_size*(j-1))]
263  max_events = (args.max_events - (args.n_jobs-1)*chunk_size
264  if j == args.n_jobs # last job gets the remaining events
265  else chunk_size)
266  event_options.extend(["--max-events", str(max_events)])
267  cmd.extend(event_options)
268  print(" ".join(cmd))
269  mps_tools.run_checked(cmd)
270 
271  # create the run script
272  print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
273  mps_tools.run_checked(["mps_script.pl", args.batch_script,
274  "jobData/{}/theScript.sh".format(jobdir),
275  os.path.join(theJobData, jobdir), "the.py",
276  "jobData/{}/theSplit".format(jobdir), theIsn,
277  args.mss_dir, lib.mssDirPool])
278 
279 
280 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
281 jobdir = "jobm";
282 lib.JOBDIR.append(jobdir)
283 lib.JOBID.append("")
284 lib.JOBSTATUS.append("SETUP")
285 lib.JOBNTRY.append(0)
286 lib.JOBRUNTIME.append(0)
287 lib.JOBNEVT.append(0)
288 lib.JOBHOST.append("")
289 lib.JOBINCR.append(0)
290 lib.JOBREMARK.append("")
291 lib.JOBSP1.append("")
292 lib.JOBSP2.append("")
293 lib.JOBSP3.append("")
294 
295 lib.write_db();
296 
297 # if merge mode, create the directory and set up contents
298 if args.setup_merge:
299  shutil.rmtree("jobData/jobm", ignore_errors = True)
300  os.makedirs("jobData/jobm")
301  print("Create dir jobData/jobm")
302 
303  # We want to merge old and new jobs
304  nJobsMerge = args.n_jobs+nJobExist
305 
306  # create merge job cfg
307  print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
308  mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
309  "jobData/jobm/alignment_merge.py",
310  os.path.join(theJobData, "jobm"), str(nJobsMerge)])
311 
312  # create merge job script
313  print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
314  mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
315  "jobData/jobm/theScript.sh",
316  os.path.join(theJobData, "jobm"),
317  "alignment_merge.py", str(nJobsMerge), args.mss_dir,
318  lib.mssDirPool])
319 
320 
321 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
322 # in jobData
323 backups = os.listdir("jobData")
324 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
325 existing_backups = [bu_regex.search(item) for item in backups]
326 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
327 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
328 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
329 ScriptCfg = os.path.join("jobData", ScriptCfg)
330 os.makedirs(ScriptCfg)
331 for f in (args.batch_script, args.config_template, args.input_file_list):
332  shutil.copy2(f, ScriptCfg)
333 if args.setup_merge:
334  shutil.copy2(args.merge_script, ScriptCfg)
335 
336 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
337 shutil.rmtree(ScriptCfg)
338 
339 
340 # Write to DB
341 lib.write_db();
342 lib.read_db();
343 lib.print_memdb();
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
dqmMemoryStats.float
float
Definition: dqmMemoryStats.py:127
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
str
#define str(s)
Definition: TestProcessor.cc:53
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
createfilelist.int
int
Definition: createfilelist.py:10
format