CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
mps_fire.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Submit jobs that are setup in local mps database to batch system
3 #
4 # The bsub sytax: bsub -J 'jobname' -q 'queue name' theProgram
5 # The jobname will be something like MP_2015.
6 # The queue name is derived from lib.classInfo.
7 # The program is theScrip.sh located in each job-directory.
8 # There may be the other option -R (see man bsub for info).
9 #
10 # Usage:
11 #
12 # mps_fire.py [-a] [-m [-f]] [maxjobs]
13 # mps_fire.py -h
14 
15 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
16 import os
17 import sys
18 import subprocess
19 import re
20 import argparse
21 
23  description="Submit jobs that are setup in local mps database to batch system.",
24 )
25 parser.add_argument("maxJobs", type=int, nargs='?', default=1,
26  help="number of Mille jobs to be submitted (default: %(default)d)")
27 parser.add_argument("-a", "--all", dest="allMille", default=False,
28  action="store_true",
29  help="submit all setup Mille jobs; maxJobs is ignored")
30 parser.add_argument("-m", "--merge", dest="fireMerge", default=False,
31  action="store_true",
32  help="submit all setup Pede jobs; maxJobs is ignored")
33 parser.add_argument("-f", "--force-merge", dest="forceMerge", default=False,
34  action="store_true",
35  help=("force the submission of the Pede job in case some "+
36  "Mille jobs are not in the OK state"))
37 args = parser.parse_args(sys.argv[1:])
38 
39 
40 lib = mpslib.jobdatabase()
41 lib.read_db()
42 
43 if args.allMille:
44  # submit all Mille jobs and ignore 'maxJobs' supplied by user
45  args.maxJobs = lib.nJobs
46 
47 # build the absolute job directory path (needed by mps_script)
48 theJobData = os.path.join(os.getcwd(), "jobData")
49 
50 # set the job name ???????????????????
51 theJobName = 'mpalign'
52 if lib.addFiles != '':
53  theJobName = lib.addFiles
54 
55 # fire the 'normal' parallel Jobs (Mille Jobs)
56 if not args.fireMerge:
57  #set the resources string coming from mps.db
58  resources = lib.get_class('mille')
59 
60  # "cmscafspec" found in $resources: special cmscaf resources
61  if 'cmscafspec' in resources:
62  print '\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n'
63  queue = resources
64  queue = queue.replace('cmscafspec','cmscaf')
65  resources = '-q'+queue+'-R cmscafspec' # FIXME why?
66  resources = '-q cmscafalcamille'
67  # "cmscaf" found in $resources
68  elif 'cmscaf' in resources:
69  # g_cmscaf for ordinary caf queue, keeping 'cmscafspec' free for pede jobs:
70  resources = '-q'+resources+' -m g_cmscaf'
71  else:
72  resources = '-q '+resources
73 
74  nSub = 0 # number of submitted Jobs
75  for i in xrange(lib.nJobs):
76  if lib.JOBSTATUS[i] == 'SETUP':
77  if nSub < args.maxJobs:
78  # submit a new job with 'bsub -J ...' and check output
79  # for some reasons LSF wants script with full path
80  submission = 'bsub -J %s %s %s/%s/theScript.sh' % \
81  (theJobName, resources, theJobData, lib.JOBDIR[i])
82  print submission
83  try:
84  result = subprocess.check_output(submission,
85  stderr=subprocess.STDOUT,
86  shell=True)
87  except subprocess.CalledProcessError as e:
88  result = "" # -> check for successful job submission will fail
89  print ' '+result,
90  result = result.strip()
91 
92  # check if job was submitted and updating jobdatabase
93  match = re.search('Job <(\d+)> is submitted', result)
94  if match:
95  # need standard format for job number
96  lib.JOBSTATUS[i] = 'SUBTD'
97  lib.JOBID[i] = int(match.group(1))
98  else:
99  print 'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result),
100  nSub +=1
101 
102 # fire the merge job
103 else:
104  print 'fire merge'
105  # set the resources string coming from mps.db
106  resources = lib.get_class('pede')
107  if 'cmscafspec' in resources:
108  queue = resources
109  queue = queue.replace('cmscafspec','cmscaf')
110  resources = '-q '+queue+' -R cmscafspec' # FIXME why?
111  resources = '-q cmscafalcamille'
112  else:
113  resources = '-q '+resources
114 
115  # Allocate memory for pede job FIXME check documentation for bsub!!!!!
116  resources = resources+' -R \"rusage[mem="%s"]\"' % str(lib.pedeMem) # FIXME the dots? -> see .pl
117 
118  # check whether all other jobs are OK
119  mergeOK = True
120  for i in xrange(lib.nJobs):
121  if lib.JOBSTATUS[i] != 'OK':
122  if 'DISABLED' not in lib.JOBSTATUS[i]:
123  mergeOK = False
124  break
125 
126  # loop over merge jobs
127  i = lib.nJobs
128  while i<len(lib.JOBDIR):
129  jobNumFrom1 = i+1
130 
131  # check if current job in SETUP mode or if forced
132  if lib.JOBSTATUS[i] != 'SETUP':
133  print 'Merge job %d status %s not submitted.' % \
134  (jobNumFrom1, lib.JOBSTATUS[i])
135  elif not (mergeOK or args.forceMerge):
136  print 'Merge job',jobNumFrom1,'not submitted since Mille jobs error/unfinished (Use -m -f to force).'
137  else:
138  # some paths for clarity
139  Path = '%s/%s' % (theJobData,lib.JOBDIR[i])
140  backupScriptPath = Path+'/theScript.sh.bak'
141  scriptPath = Path+'/theScript.sh'
142 
143  # force option invoked:
144  if args.forceMerge:
145 
146  # make a backup copy of the script first, if it doesn't already exist.
147  if not os.path.isfile(backupScriptPath):
148  os.system('cp -p '+scriptPath+' '+backupScriptPath)
149 
150  # get the name of merge cfg file -> either the.py or alignment_merge.py
151  command = 'cat '+backupScriptPath+' | grep cmsRun | grep "\.py" | head -1 | awk \'{gsub("^.*cmsRun ","");print $1}\''
152  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
153  command = 'basename '+mergeCfg
154  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
155  mergeCfg = mergeCfg.replace('\n','')
156 
157  # make a backup copy of the cfg
158  backupCfgPath = Path+'/%s.bak' % mergeCfg
159  cfgPath = Path+'/%s' % mergeCfg
160  if not os.path.isfile(backupCfgPath):
161  os.system('cp -p '+cfgPath+' '+backupCfgPath)
162 
163  # rewrite the mergeCfg using only 'OK' jobs (uses first mille-job as baseconfig)
164  inCfgPath = theJobData+'/'+lib.JOBDIR[0]+'/the.py'
165  command ='mps_merge.py -c '+inCfgPath+' '+Path+'/'+mergeCfg+' '+Path+' '+str(lib.nJobs)
166  os.system(command)
167 
168  # rewrite theScript.sh using inly 'OK' jobs
169  command = 'mps_scriptm.pl -c '+lib.mergeScript+' '+scriptPath+' '+Path+' '+mergeCfg+' '+str(lib.nJobs)+' '+lib.mssDir+' '+lib.mssDirPool
170  os.system(command)
171 
172  else:
173  # restore the backup copy of the script
174  if os.path.isfile(backupScriptPath):
175  os.system('cp -pf '+backupScriptPath+' '+scriptPath)
176 
177  # get the name of merge cfg file
178  command = 'cat '+scriptPath+' | grep cmsRun | grep "\.py" | head -1 | awk \'{gsub("^.*cmsRun ","");print $1}\''
179  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
180  command = 'basename '+mergeCfg
181  mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
182  mergeCfg = mergeCfg.replace('\n','')
183 
184  # restore the backup copy of the cfg
185  backupCfgPath = Path+'/%s.bak' % mergeCfg
186  cfgPath = Path+'/%s' % mergeCfg
187  if os.path.isfile(backupCfgPath):
188  os.system('cp -pf '+backupCfgPath+' '+cfgPath)
189 
190  # end of if/else forceMerge
191 
192  # submit merge job
193  nMerge = i-lib.nJobs # 'index' of this merge job
194  curJobName = 'm'+str(nMerge)+'_'+theJobName
195  submission = 'bsub -J %s %s %s' % (curJobName,resources,scriptPath)
196  result = subprocess.check_output(submission, stderr=subprocess.STDOUT, shell=True)
197  print ' '+result,
198  result = result.strip()
199 
200  # check if merge job was submitted and updating jobdatabase
201  match = re.search('Job <(\d+)> is submitted', result)
202  if match:
203  # need standard format for job number
204  lib.JOBSTATUS[i] = 'SUBTD'
205  lib.JOBID[i] = int(match.group(1))
206  print 'jobid is',lib.JOBID[i]
207  else:
208  print 'Submission of merge job seems to have failed:',result,
209 
210  i +=1
211  # end of while on merge jobs
212 
213 
214 lib.write_db()
215 
216 
217