15 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass
as mpslib
16 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
26 """Forward proxy to location visible from the batch system. 29 - `rundir`: directory for storing the forwarded proxy 34 subprocess.check_call([
"voms-proxy-info",
"--exists"])
35 except subprocess.CalledProcessError:
36 print "Please initialize your proxy before submitting." 39 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
strip()
40 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
46 description=
"Submit jobs that are setup in local mps database to batch system.",
48 parser.add_argument(
"maxJobs", type=int, nargs=
'?', default=1,
49 help=
"number of Mille jobs to be submitted (default: %(default)d)")
50 parser.add_argument(
"-a",
"--all", dest=
"allMille", default=
False,
52 help=
"submit all setup Mille jobs; maxJobs is ignored")
53 parser.add_argument(
"-m",
"--merge", dest=
"fireMerge", default=
False,
55 help=
"submit all setup Pede jobs; maxJobs is ignored")
56 parser.add_argument(
"-f",
"--force-merge", dest=
"forceMerge", default=
False,
58 help=(
"force the submission of the Pede job in case some "+
59 "Mille jobs are not in the OK state"))
60 parser.add_argument(
"-p",
"--forward-proxy", dest=
"forwardProxy", default=
False,
62 help=
"forward VOMS proxy to batch system")
63 args = parser.parse_args(sys.argv[1:])
66 lib = mpslib.jobdatabase()
71 args.maxJobs = lib.nJobs
74 theJobData = os.path.join(os.getcwd(),
"jobData")
77 theJobName =
'mpalign' 78 if lib.addFiles !=
'':
79 theJobName = lib.addFiles
82 if not args.fireMerge:
84 resources = lib.get_class(
'mille')
87 if 'cmscafspec' in resources:
88 print '\nWARNING:\n Running mille jobs on cmscafspec, intended for pede only!\n\n' 90 queue = queue.replace(
'cmscafspec',
'cmscaf')
91 resources =
'-q'+queue+
'-R cmscafspec' 92 resources =
'-q cmscafalcamille' 94 elif 'cmscaf' in resources:
96 resources =
'-q'+resources+
' -m g_cmscaf' 98 resources =
'-q '+resources
101 for i
in xrange(lib.nJobs):
102 if lib.JOBSTATUS[i] ==
'SETUP':
103 if nSub < args.maxJobs:
104 if args.forwardProxy:
109 submission =
'bsub -J %s %s %s/%s/theScript.sh' % \
110 (theJobName, resources, theJobData, lib.JOBDIR[i])
113 result = subprocess.check_output(submission,
114 stderr=subprocess.STDOUT,
116 except subprocess.CalledProcessError
as e:
119 result = result.strip()
122 match = re.search(
'Job <(\d+)> is submitted', result)
125 lib.JOBSTATUS[i] =
'SUBTD' 126 lib.JOBID[i] =
int(match.group(1))
128 print 'Submission of %03d seems to have failed: %s' % (lib.JOBNUMBER[i],result),
135 resources = lib.get_class(
'pede')
136 if 'cmscafspec' in resources:
138 queue = queue.replace(
'cmscafspec',
'cmscaf')
139 resources =
'-q '+queue+
' -R cmscafspec' 140 resources =
'-q cmscafalcamille' 142 resources =
'-q '+resources
145 resources = resources+
' -R \"rusage[mem="%s"]\"' %
str(lib.pedeMem)
149 for i
in xrange(lib.nJobs):
150 if lib.JOBSTATUS[i] !=
'OK':
151 if 'DISABLED' not in lib.JOBSTATUS[i]:
157 while i<len(lib.JOBDIR):
161 if lib.JOBSTATUS[i] !=
'SETUP':
162 print 'Merge job %d status %s not submitted.' % \
163 (jobNumFrom1, lib.JOBSTATUS[i])
164 elif not (mergeOK
or args.forceMerge):
165 print 'Merge job',jobNumFrom1,
'not submitted since Mille jobs error/unfinished (Use -m -f to force).' 168 Path =
'%s/%s' % (theJobData,lib.JOBDIR[i])
169 backupScriptPath = Path+
'/theScript.sh.bak' 170 scriptPath = Path+
'/theScript.sh' 176 if not os.path.isfile(backupScriptPath):
177 os.system(
'cp -p '+scriptPath+
' '+backupScriptPath)
180 command =
'cat '+backupScriptPath+
' | grep CONFIG_FILE | head -1 | awk -F"/" \'{print $NF}\'' 181 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True)
182 mergeCfg = mergeCfg.strip()
185 backupCfgPath = Path+
'/%s.bak' % mergeCfg
186 cfgPath = Path+
'/%s' % mergeCfg
187 if not os.path.isfile(backupCfgPath):
188 os.system(
'cp -p '+cfgPath+
' '+backupCfgPath)
191 with open(os.path.join(Path,
".weights.pkl"),
"rb")
as f:
192 weight_conf = cPickle.load(f)
195 mps_tools.run_checked([
"mps_weight.pl",
"-c"])
198 for name,weight
in weight_conf:
199 print " ".
join([
"mps_weight.pl",
"-N", name, weight])
200 mps_tools.run_checked([
"mps_weight.pl",
"-N", name, weight])
203 inCfgPath = theJobData+
'/'+lib.JOBDIR[0]+
'/the.py' 204 command =
'mps_merge.py -w -c '+inCfgPath+
' '+Path+
'/'+mergeCfg+
' '+Path+
' '+
str(lib.nJobs)
208 command =
'mps_scriptm.pl -c '+lib.mergeScript+
' '+scriptPath+
' '+Path+
' '+mergeCfg+
' '+
str(lib.nJobs)+
' '+lib.mssDir+
' '+lib.mssDirPool
213 if os.path.isfile(backupScriptPath):
214 os.system(
'cp -pf '+backupScriptPath+
' '+scriptPath)
217 command =
"cat "+scriptPath+
" | grep '^\s*CONFIG_FILE' | awk -F'=' '{print $2}'" 218 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True)
219 command =
'basename '+mergeCfg
220 mergeCfg = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=
True)
221 mergeCfg = mergeCfg.replace(
'\n',
'')
224 backupCfgPath = Path+
'/%s.bak' % mergeCfg
225 cfgPath = Path+
'/%s' % mergeCfg
226 if os.path.isfile(backupCfgPath):
227 os.system(
'cp -pf '+backupCfgPath+
' '+cfgPath)
233 curJobName =
'm'+
str(nMerge)+
'_'+theJobName
234 if args.forwardProxy:
forward_proxy(os.path.dirname(scriptPath))
235 submission =
'bsub -J %s %s %s' % (curJobName,resources,scriptPath)
236 result = subprocess.check_output(submission, stderr=subprocess.STDOUT, shell=
True)
238 result = result.strip()
241 match = re.search(
'Job <(\d+)> is submitted', result)
244 lib.JOBSTATUS[i] =
'SUBTD' 245 lib.JOBID[i] =
int(match.group(1))
246 print 'jobid is',lib.JOBID[i]
248 print 'Submission of merge job seems to have failed:',result,
def forward_proxy(rundir)
static std::string join(char **cmd)