CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
16 import os
17 import sys
18 import subprocess
19 import re
20 import argparse
21 
23  description="Submit jobs that are setup in local mps database to batch system.",
24 )
25 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
26  help="number of Mille jobs to be submitted (default: %(default)d)")
27 parser.add_argument("-a", "--all", dest="allMille", default=False,
28  action="store_true",
29  help="submit all setup Mille jobs; maxJobs is ignored")
30 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
31  action="store_true",
32  help="submit all setup Pede jobs; maxJobs is ignored")
33 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
34  action="store_true",
35  help=("force the submission of the Pede job in case some "+
36  "Mille jobs are not in the OK state"))
37 args = parser.parse_args(sys.argv[1:])
38 
39 
40 lib = mpslib.jobdatabase()
41 lib.read_db()
42 
43 if args.allMille:
44  # submit all Mille jobs and ignore 'maxJobs' supplied by user
45  args.maxJobs = lib.nJobs
46 
47 # build the absolute job directory path (needed by mps_script)
48 theJobData = os.path.join(os.getcwd(), "jobData")
49 
50 # set the job name ???????????????????
51 theJobName = 'mpalign'
52 if lib.addFiles != '':
53  theJobName = lib.addFiles
54 
55 # fire the 'normal' parallel Jobs (Mille Jobs)
56 if not args.fireMerge:
57  #set the resources string coming from mps.db
58  resources = lib.get_class('mille')
59 
60  # "cmscafspec" found in $resources: special cmscaf resources
61  if 'cmscafspec' in resources:
62  print '\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n'
63  queue = resources
64  queue = queue.replace('cmscafspec','cmscaf')
65  resources = '-q'+queue+'-R cmscafspec' # FIXME why?
66  resources = '-q cmscafalcamille'
67  # "cmscaf" found in $resources
68  elif 'cmscaf' in resources:
69  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
70  resources = '-q'+resources+' -m g_cmscaf'
71  else:
72  resources = '-q '+resources
73 
74  nSub = 0 # number of submitted Jobs
75  for i in xrange(lib.nJobs):
76  if lib.JOBSTATUS[i] == 'SETUP':
77  if nSub < args.maxJobs:
78  # submit a new job with 'bsub -J ...' and check output
79  # for some reasons LSF wants script with full path
80  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
81  (theJobName, resources, theJobData, lib.JOBDIR[i])
82  print submission
83  result = subprocess.check_output(submission, stderr=subprocess.STDOUT, shell=True)
84  print ' '+result,
85  result = result.strip()
86 
87  # check if job was submitted and updating jobdatabase
88  match = re.search('Job <(\d+)> is submitted', result)
89  if match:
90  # need standard format for job number
91  lib.JOBSTATUS[i] = 'SUBTD'
92  lib.JOBID[i] = int(match.group(1))
93  else:
94  print 'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result),
95  nSub +=1
96 
97 # fire the merge job
98 else:
99  print 'fire merge'
100  # set the resources string coming from mps.db
101  resources = lib.get_class('pede')
102  if 'cmscafspec' in resources:
103  queue = resources
104  queue = queue.replace('cmscafspec','cmscaf')
105  resources = '-q '+queue+' -R cmscafspec' # FIXME why?
106  resources = '-q cmscafalcamille'
107  else:
108  resources = '-q '+resources
109 
110  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
111  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
112 
113  # check whether all other jobs are OK
114  mergeOK = True
115  for i in xrange(lib.nJobs):
116  if lib.JOBSTATUS[i] != 'OK':
117  if 'DISABLED' not in lib.JOBSTATUS[i]:
118  mergeOK = False
119  break
120 
121  # loop over merge jobs
122  i = lib.nJobs
123  while i<len(lib.JOBDIR):
124  jobNumFrom1 = i+1
125 
126  # check if current job in SETUP mode or if forced
127  if lib.JOBSTATUS[i] != 'SETUP':
128  print 'Merge job %d status %s not submitted.' % \
129  (jobNumFrom1, lib.JOBSTATUS[i])
130  elif not (mergeOK or args.forceMerge):
131  print 'Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).'
132  else:
133  # some paths for clarity
134  Path = '%s/%s' % (theJobData,lib.JOBDIR[i])
135  backupScriptPath = Path+'/theScript.sh.bak'
136  scriptPath = Path+'/theScript.sh'
137 
138  # force option invoked:
139  if args.forceMerge:
140 
141  # make a backup copy of the script first, if it doesn't already exist.
142  if not os.path.isfile(backupScriptPath):
143  os.system('cp -p '+scriptPath+' '+backupScriptPath)
144 
145  # get the name of merge cfg file -> either the.py or alignment_merge.py
146  command = 'cat '+backupScriptPath+' | grep cmsRun | grep "\.py" | head -1 | awk \'{gsub("^.*cmsRun ","");print $1}\''
147  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
148  command = 'basename '+mergeCfg
149  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
150  mergeCfg = mergeCfg.replace('\n','')
151 
152  # make a backup copy of the cfg
153  backupCfgPath = Path+'/%s.bak' % mergeCfg
154  cfgPath = Path+'/%s' % mergeCfg
155  if not os.path.isfile(backupCfgPath):
156  os.system('cp -p '+cfgPath+' '+backupCfgPath)
157 
158  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
159  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
160  command ='mps_merge.py -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
161  os.system(command)
162 
163  # rewrite theScript.sh using inly 'OK' jobs
164  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
165  os.system(command)
166 
167  else:
168  # restore the backup copy of the script
169  if os.path.isfile(backupScriptPath):
170  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
171 
172  # get the name of merge cfg file
173  command = 'cat '+scriptPath+' | grep cmsRun | grep "\.py" | head -1 | awk \'{gsub("^.*cmsRun ","");print $1}\''
174  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
175  command = 'basename '+mergeCfg
176  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
177  mergeCfg = mergeCfg.replace('\n','')
178 
179  # restore the backup copy of the cfg
180  backupCfgPath = Path+'/%s.bak' % mergeCfg
181  cfgPath = Path+'/%s' % mergeCfg
182  if os.path.isfile(backupCfgPath):
183  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
184 
185  # end of if/else forceMerge
186 
187  # submit merge job
188  nMerge = i-lib.nJobs # 'index' of this merge job
189  curJobName = 'm'+str(nMerge)+'_'+theJobName
190  submission = 'bsub -J %s %s %s' % (curJobName,resources,scriptPath)
191  result = subprocess.check_output(submission, stderr=subprocess.STDOUT, shell=True)
192  print ' '+result,
193  result = result.strip()
194 
195  # check if merge job was submitted and updating jobdatabase
196  match = re.search('Job <(\d+)> is submitted', result)
197  if match:
198  # need standard format for job number
199  lib.JOBSTATUS[i] = 'SUBTD'
200  lib.JOBID[i] = int(match.group(1))
201  print 'jobid is',lib.JOBID[i]
202  else:
203  print 'Submission of merge job seems to have failed:',result,
204 
205  i +=1
206  # end of while on merge jobs
207 
208 
209 lib.write_db()
210 
211 
212