CMS 3D CMS Logo

parallelization.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # This script sets up parallel jobs for the build, integrate and run
4 # step when using Herwig with the CMSSW framework.
5 # It takes a cmsRun file, adjusts the parameters in it accordingly to
6 # the options and saves them to temporary cmsRun files. For each step
7 # a different cmsRun file is created. The original file remains
8 # unaltered.
9 
10 # Possible options:
11 # -b/--build : sets the number of build jobs and starts the build step.
12 # -i/--integrate : sets the maximal number of integration jobs
13 # This option already has to be set when the build step is invoked.
14 # The integration step will be performed if this option is set,
15 # unless --nointegration is chosen.
16 # The actual number of integration jobs may be smaller. It is
17 # determined by the number of files in Herwig-scratch/Build.
18 # -r/--run : sets the number of run jobs and starts the run step.
19 # --nointegration : use this option to set up several integration jobs
20 # without actually performing them
21 # --stoprun: use this option if you want to create the cmsRun files
22 # without calling cmsRun
23 # --resumerun: no new cmsRun files for the run step will be created
24 # For this option to work 'temporary' cmsRun files complying to the
25 # naming scheme have to be availible. Only files up to the number
26 # of jobs defined by --run will be considered.
27 # --keepfiles : don't remove the created temporary cmsRun files
28 # --l/--log: write the output of each shell command called in a
29 # seperate log file
30 
31 # Comments in the cmsRun file in the process.generator part may confuse
32 # this script. Check the temporary cmsRun files if errors occur.
33 
34 # A parallelized run step is achieved by calling cmsRun an according
35 # number of times with different seeds for Herwig. The built in feature
36 # of Herwig wont be used.
37 
38 # Author: Dominik Beutel
39 
40 
41 from __future__ import print_function
42 import argparse
43 import sys
44 import os
45 import subprocess
46 import re
47 
48 
49 
50 def uint(string):
51  """Unsigned int type"""
52  value = int(string)
53  if value < 0:
54  msg = '{0} is negative'.format(string)
55  raise argparse.ArgumentTypeError(msg)
56  return value
57 
58 
59 
60 def adjust_pset(cmsrunfilename, savefilename, par_list):
61  """Takes the cmsRun filem, removes all occurences of runMode, jobs,
62  maxJobs and integrationList parameters in the process.generator
63  part.
64  The the parameters in par_list are set instead and saved.
65  """
66 
67  with open(cmsrunfilename, 'r') as readfile:
68  parsestring = readfile.read()
69 
70  # get first opening bracket after process.generator
71  begin_gen_step = parsestring.find('(', parsestring.find('process.generator'))
72 
73  # find matching bracket
74  end_gen_step = begin_gen_step
75  bracket_counter = 1
76  for position in range(begin_gen_step+1, len(parsestring)):
77  if parsestring[position] == '(':
78  bracket_counter += 1
79  if parsestring[position] == ')':
80  bracket_counter -= 1
81  if not bracket_counter:
82  end_gen_step = position
83  break
84 
85  # get string between brackets
86  gen_string = parsestring[begin_gen_step+1:end_gen_step]
87 
88  # remove all parameters that would interfere
89  gen_string = re.sub(r',\s*runModeList\s*=\s*cms.untracked.string\((.*?)\)', '', gen_string)
90  gen_string = re.sub(r',\s*jobs\s*=\s*cms.untracked.int32\((.*?)\)', '', gen_string)
91  gen_string = re.sub(r',\s*integrationList\s*=\s*cms.untracked.string\((.*?)\)', '', gen_string)
92  gen_string = re.sub(r',\s*maxJobs\s*=\s*cms.untracked.uint32\((.*?)\)', '', gen_string)
93  gen_string = re.sub(r',\s*seed\s*=\s*cms.untracked.int32\((.*?)\)', '', gen_string)
94 
95 
96  # write the savefile with all parameters given in par_list
97  with open(savefilename,'w') as savefile:
98  savefile.write(parsestring[:begin_gen_step+1])
99  savefile.write(gen_string)
100  for item in par_list:
101  savefile.write(',\n')
102  savefile.write(item)
103  savefile.write(parsestring[end_gen_step:])
104 
105 
106 
107 def cleanupandexit(filelist):
108  """Delete the files in filelist and exit"""
109  for filename in filelist:
110  os.remove(filename)
111  sys.exit(0)
112 
113 
114 
115 
116 
119 
120 parser = argparse.ArgumentParser()
121 
122 parser.add_argument('cmsRunfile', help='filename of the cmsRun configuration')
123 parser.add_argument('-b', '--build', help='set the number of build jobs', type=int, choices=range(0,11), default=0)
124 parser.add_argument('-i', '--integrate', help='set the maximal number of integration jobs', type=uint, default=0)
125 parser.add_argument('-r', '--run', help='set the number of run jobs', type=int, choices=range(0,11), default=0)
126 parser.add_argument('--nointegration', help='build -i integration jobs without actually integrating', action='store_true')
127 parser.add_argument('--keepfiles', help='don\'t delete temporary files', action='store_true')
128 parser.add_argument('--stoprun', help='stop after creating the cmsRun files for the run step', action='store_true')
129 parser.add_argument('--resumerun', help='use existing \'temporary\' files for the run step', action='store_true')
130 parser.add_argument('-l', '--log', help='write the output of each process in a separate log file', action='store_true')
131 
132 args = parser.parse_args()
133 
134 # List of files needed for clean-up
135 cleanupfiles = []
136 
137 # Create a template name for all created files
138 template_name = args.cmsRunfile.replace('.', '_')
139 
140 
141 
142 
145 
146 
147 
148 # jobs defines number of build jobs in the cmsRun file
149 # maxJobs tells Herwig to prepare the according number
150 # of integrations
151 
152 if args.build != 0:
153  # Set up parameters
154  parameters = ['runModeList = cms.untracked.string(\'build\')']
155  parameters.append('jobs = cms.untracked.int32(' + str(args.build) + ')')
156  if args.integrate != 0:
157  parameters.append('maxJobs = cms.untracked.uint32(' + str(args.integrate) + ')')
158 
159  build_name = template_name + '_build.py'
160  adjust_pset(args.cmsRunfile, build_name, parameters)
161 
162  cleanupfiles.append(build_name)
163 
164  # Start build job
165  print('Setting up {0} build jobs.'.format(str(args.build)))
166  print('Setting up a maximum of {0} integration jobs.'.format(str(args.integrate)))
167  print('Calling\t\'cmsRun ' + build_name + '\'')
168 
169  if args.log:
170  print('Writing ouput to log file: ' + build_name[:-2] + 'log')
171  with open(build_name[:-2] + 'log', 'w') as build_log:
172  process = subprocess.Popen(['cmsRun', build_name], stdout=build_log, stderr=subprocess.STDOUT)
173  else:
174  process = subprocess.Popen(['cmsRun ' + build_name], shell=True)
175  process.wait()
176 
177  print('--------------------')
178  print('Build step finished.')
179  print('--------------------')
180 
181 
182 
183 
184 
185 # Stop in case no integration is desired
186 if args.nointegration:
187  print('--nointegration: Run will be stopped here.')
188  cleanupandexit(cleanupfiles)
189 
190 if args.integrate != 0:
191  # Determine number of integration jobs
192  actual_int_jobs = len([string for string in os.listdir('Herwig-scratch/Build') if re.match(r'integrationJob[0-9]+', string)])
193 
194  # Stop if this number exceeds the given parameter
195  if actual_int_jobs > args.integrate:
196  print('Actual number of integration jobs {0} exceeds \'--integrate {1}\'.'.format(actual_int_jobs, args.integrate))
197  print('Integration will not be performed.')
198  cleanupandexit(cleanupfiles)
199 
200  # Start the integration jobs
201  print('Found {0} integration jobs, a maxiumum of {1} was given.'.format(actual_int_jobs, args.integrate))
202  print('Starting all jobs.')
203  if not args.log:
204  print('--- Output may be cluttered. (Try the option -l/--log) ---')
205  processes = []
206  for i in range(actual_int_jobs):
207  # Set up parameters
208  parameters = ['runModeList = cms.untracked.string(\'integrate\')']
209  parameters.append('integrationList = cms.untracked.string(\'' + str(i) + '\')')
210 
211  integration_name = template_name + '_integrate_' + str(i) + '.py'
212  adjust_pset(args.cmsRunfile, integration_name, parameters)
213 
214  cleanupfiles.append(integration_name)
215 
216  print('Calling\t\'cmsRun ' + integration_name + '\'')
217  if args.log:
218  print('Writing ouput to log file: ' + integration_name[:-2] + 'log')
219  with open(integration_name[:-2] + 'log', 'w') as integration_log:
220  processes.append( subprocess.Popen(['cmsRun', integration_name], stdout=integration_log, stderr=subprocess.STDOUT) )
221  else:
222  processes.append( subprocess.Popen(['cmsRun', integration_name]) )
223 
224 
225  # Wait for all processes to finish
226  for process in processes:
227  process.wait()
228  print('--------------------------')
229  print('Integration step finished.')
230  print('--------------------------')
231 
232 
233 
234 
235 
236 
238 
239 
254 
255 
259 
260 
261 if args.stoprun and args.resumerun:
262  print('--stoprun AND --resumerun are chosen: run step will be omitted.')
263  cleanupandexit(cleanupfiles)
264 
265 if args.run != 0:
266  # Start the run jobs
267  print('Setting up {0} runs.'.format(args.run))
268  if not args.log:
269  print('--- Output may be cluttered. (Try the option -l/--log) ---')
270  processes = []
271  for i in range(args.run):
272  run_name = template_name + '_run_' + str(i) + '.py'
273 
274  # Only create new files if this isn't a resumed run
275  if not args.resumerun:
276  parameters = ['runModeList = cms.untracked.string(\'run\')']
277  # Set different seeds
278  parameters.append('seed = cms.untracked.int32(' + str(i) + ')')
279  adjust_pset(args.cmsRunfile, run_name, parameters)
280 
281  # Unless run will be stopped execute the jobs
282  if not args.stoprun:
283  # Don't mark the files for cleanup if this is a resumed run
284  if not args.resumerun:
285  cleanupfiles.append(run_name)
286 
287  if not os.path.isfile(run_name):
288  print('\'' + run_name + '\' not found. It will be skipped.')
289  continue
290 
291  print('Calling\t\'cmsRun ' + run_name + '\'')
292  if args.log:
293  print('Writing ouput to log file: ' + run_name[:-2] + 'log')
294  with open(run_name[:-2] + 'log', 'w') as run_log:
295  processes.append( subprocess.Popen(['cmsRun', run_name], stdout=run_log, stderr=subprocess.STDOUT) )
296  else:
297  processes.append( subprocess.Popen(['cmsRun', run_name]) )
298 
299 
300  # Wait for all processes to finish
301  for process in processes:
302  process.wait()
303  if args.stoprun:
304  print('--stoprun: kept run files and stopped before calling cmsRun')
305  print('------------------')
306  print('Run step finished.')
307  print('------------------')
308 
309 
310 
311 if not args.keepfiles:
312  cleanupandexit(cleanupfiles)
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
parallelization.int
int
Definition: parallelization.py:123
parallelization.uint
uint
Definition: parallelization.py:124
str
#define str(s)
Definition: TestProcessor.cc:52
parallelization.adjust_pset
def adjust_pset(cmsrunfilename, savefilename, par_list)
Definition: parallelization.py:60
parallelization.cleanupandexit
def cleanupandexit(filelist)
Definition: parallelization.py:107
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
format