CMS 3D CMS Logo

mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
16 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
17 import os
18 import sys
19 import shutil
20 import cPickle
21 import subprocess
22 import re
23 import argparse
24 
25 def forward_proxy(rundir):
26  """Forward proxy to location visible from the batch system.
27 
28  Arguments:
29  - `rundir`: directory for storing the forwarded proxy
30  """
31 
32  # check first if proxy is set
33  try:
34  subprocess.check_call(["voms-proxy-info", "--exists"])
35  except subprocess.CalledProcessError:
36  print "Please initialize your proxy before submitting."
37  sys.exit(1)
38 
39  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
40  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
41 
42 
43 
44 
46  description="Submit jobs that are setup in local mps database to batch system.",
47 )
48 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
49  help="number of Mille jobs to be submitted (default: %(default)d)")
50 parser.add_argument("-a", "--all", dest="allMille", default=False,
51  action="store_true",
52  help="submit all setup Mille jobs; maxJobs is ignored")
53 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
54  action="store_true",
55  help="submit all setup Pede jobs; maxJobs is ignored")
56 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
57  action="store_true",
58  help=("force the submission of the Pede job in case some "+
59  "Mille jobs are not in the OK state"))
60 parser.add_argument("-p", "--forward-proxy", dest="forwardProxy", default=False,
61  action="store_true",
62  help="forward VOMS proxy to batch system")
63 args = parser.parse_args(sys.argv[1:])
64 
65 
66 lib = mpslib.jobdatabase()
67 lib.read_db()
68 
69 if args.allMille:
70  # submit all Mille jobs and ignore 'maxJobs' supplied by user
71  args.maxJobs = lib.nJobs
72 
73 # build the absolute job directory path (needed by mps_script)
74 theJobData = os.path.join(os.getcwd(), "jobData")
75 
76 # set the job name ???????????????????
77 theJobName = 'mpalign'
78 if lib.addFiles != '':
79  theJobName = lib.addFiles
80 
81 # fire the 'normal' parallel Jobs (Mille Jobs)
82 if not args.fireMerge:
83  #set the resources string coming from mps.db
84  resources = lib.get_class('mille')
85 
86  # "cmscafspec" found in $resources: special cmscaf resources
87  if 'cmscafspec' in resources:
88  print '\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n'
89  queue = resources
90  queue = queue.replace('cmscafspec','cmscaf')
91  resources = '-q'+queue+'-R cmscafspec' # FIXME why?
92  resources = '-q cmscafalcamille'
93  # "cmscaf" found in $resources
94  elif 'cmscaf' in resources:
95  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
96  resources = '-q'+resources+' -m g_cmscaf'
97  else:
98  resources = '-q '+resources
99 
100  nSub = 0 # number of submitted Jobs
101  for i in xrange(lib.nJobs):
102  if lib.JOBSTATUS[i] == 'SETUP':
103  if nSub < args.maxJobs:
104  if args.forwardProxy:
105  forward_proxy(os.path.join(theJobData,lib.JOBDIR[i]))
106 
107  # submit a new job with 'bsub -J ...' and check output
108  # for some reasons LSF wants script with full path
109  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
110  (theJobName, resources, theJobData, lib.JOBDIR[i])
111  print submission
112  try:
113  result = subprocess.check_output(submission,
114  stderr=subprocess.STDOUT,
115  shell=True)
116  except subprocess.CalledProcessError as e:
117  result = "" # -> check for successful job submission will fail
118  print ' '+result,
119  result = result.strip()
120 
121  # check if job was submitted and updating jobdatabase
122  match = re.search('Job <(\d+)> is submitted', result)
123  if match:
124  # need standard format for job number
125  lib.JOBSTATUS[i] = 'SUBTD'
126  lib.JOBID[i] = int(match.group(1))
127  else:
128  print 'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result),
129  nSub +=1
130 
131 # fire the merge job
132 else:
133  print 'fire merge'
134  # set the resources string coming from mps.db
135  resources = lib.get_class('pede')
136  if 'cmscafspec' in resources:
137  queue = resources
138  queue = queue.replace('cmscafspec','cmscaf')
139  resources = '-q '+queue+' -R cmscafspec' # FIXME why?
140  resources = '-q cmscafalcamille'
141  else:
142  resources = '-q '+resources
143 
144  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
145  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
146 
147  # check whether all other jobs are OK
148  mergeOK = True
149  for i in xrange(lib.nJobs):
150  if lib.JOBSTATUS[i] != 'OK':
151  if 'DISABLED' not in lib.JOBSTATUS[i]:
152  mergeOK = False
153  break
154 
155  # loop over merge jobs
156  i = lib.nJobs
157  while i<len(lib.JOBDIR):
158  jobNumFrom1 = i+1
159 
160  # check if current job in SETUP mode or if forced
161  if lib.JOBSTATUS[i] != 'SETUP':
162  print 'Merge job %d status %s not submitted.' % \
163  (jobNumFrom1, lib.JOBSTATUS[i])
164  elif not (mergeOK or args.forceMerge):
165  print 'Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).'
166  else:
167  # some paths for clarity
168  Path = '%s/%s' % (theJobData,lib.JOBDIR[i])
169  backupScriptPath = Path+'/theScript.sh.bak'
170  scriptPath = Path+'/theScript.sh'
171 
172  # force option invoked:
173  if args.forceMerge:
174 
175  # make a backup copy of the script first, if it doesn't already exist.
176  if not os.path.isfile(backupScriptPath):
177  os.system('cp -p '+scriptPath+' '+backupScriptPath)
178 
179  # get the name of merge cfg file -> either the.py or alignment_merge.py
180  command = 'cat '+backupScriptPath+' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\''
181  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
182  mergeCfg = mergeCfg.strip()
183 
184  # make a backup copy of the cfg
185  backupCfgPath = Path+'/%s.bak' % mergeCfg
186  cfgPath = Path+'/%s' % mergeCfg
187  if not os.path.isfile(backupCfgPath):
188  os.system('cp -p '+cfgPath+' '+backupCfgPath)
189 
190  # retrieve weights configuration
191  with open(os.path.join(Path, ".weights.pkl"), "rb") as f:
192  weight_conf = cPickle.load(f)
193 
194  # blank weights
195  mps_tools.run_checked(["mps_weight.pl", "-c"])
196 
197  # apply weights
198  for name,weight in weight_conf:
199  print " ".join(["mps_weight.pl", "-N", name, weight])
200  mps_tools.run_checked(["mps_weight.pl", "-N", name, weight])
201 
202  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
203  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
204  command ='mps_merge.py -w -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
205  os.system(command)
206 
207  # rewrite theScript.sh using inly 'OK' jobs
208  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
209  os.system(command)
210 
211  else:
212  # restore the backup copy of the script
213  if os.path.isfile(backupScriptPath):
214  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
215 
216  # get the name of merge cfg file
217  command = "cat "+scriptPath+" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'"
218  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
219  command = 'basename '+mergeCfg
220  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
221  mergeCfg = mergeCfg.replace('\n','')
222 
223  # restore the backup copy of the cfg
224  backupCfgPath = Path+'/%s.bak' % mergeCfg
225  cfgPath = Path+'/%s' % mergeCfg
226  if os.path.isfile(backupCfgPath):
227  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
228 
229  # end of if/else forceMerge
230 
231  # submit merge job
232  nMerge = i-lib.nJobs # 'index' of this merge job
233  curJobName = 'm'+str(nMerge)+'_'+theJobName
234  if args.forwardProxy: forward_proxy(os.path.dirname(scriptPath))
235  submission = 'bsub -J %s %s %s' % (curJobName,resources,scriptPath)
236  result = subprocess.check_output(submission, stderr=subprocess.STDOUT, shell=True)
237  print ' '+result,
238  result = result.strip()
239 
240  # check if merge job was submitted and updating jobdatabase
241  match = re.search('Job <(\d+)> is submitted', result)
242  if match:
243  # need standard format for job number
244  lib.JOBSTATUS[i] = 'SUBTD'
245  lib.JOBID[i] = int(match.group(1))
246  print 'jobid is',lib.JOBID[i]
247  else:
248  print 'Submission of merge job seems to have failed:',result,
249 
250  i +=1
251  # end of while on merge jobs
252 
253 
254 lib.write_db()
def forward_proxy(rundir)
Definition: mps_fire.py:25
static std::string join(char **cmd)
Definition: RemoteFile.cc:18