3 from datetime
import datetime
4 from optparse
import OptionParser
15 This class manages batch jobs
31 self.parser_.add_option(
"-o",
"--output-dir", dest=
"outputDir",
32 help=
"Name of the local output directory for your jobs. This directory will be created automatically.",
34 self.parser_.add_option(
"-r",
"--remote-copy", dest=
"remoteCopy",
35 help=
"remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory is tarred and compressed into Logger.tgz, and sent to the remote output directory as well. Afterwards, use logger.py to access the information contained in Logger.tgz. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Logs will be sent back to the submision directory. NOTE: so far this option has been implemented and validated to work only for a remote copy to PSI",
37 self.parser_.add_option(
"-f",
"--force", action=
"store_true",
38 dest=
"force", default=
False,
39 help=
"Don't ask any questions, just over-write")
41 self.parser_.add_option(
"-n",
"--negate", action=
"store_true",
42 dest=
"negate", default=
False,
43 help=
"create jobs, but does not submit the jobs.")
44 self.parser_.add_option(
"-b",
"--batch", dest=
"batch",
45 help=
"batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
46 default=
"bsub -q 8nh < ./batchScript.sh")
47 self.parser_.add_option(
"-p",
"--parametric", action=
"store_true",
48 dest=
"parametric", default=
False,
49 help=
"submit jobs parametrically, implemented for IC so far")
53 (self.options_,self.args_) = self.parser_.parse_args()
54 if self.options_.remoteCopy ==
None:
58 import CMGTools.Production.eostools
as castortools
63 if self.remoteOutputDir_.startswith(
"/pnfs/psi.ch"):
64 ld_lib_path = os.environ.get(
'LD_LIBRARY_PATH')
65 if ld_lib_path !=
"None":
66 os.environ[
'LD_LIBRARY_PATH'] =
"/usr/lib64/:"+ld_lib_path
68 outputDir = self.options_.outputDir.rstrip(
"/").
split(
"/")[-1]
70 today = datetime.today()
71 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M")
74 if ld_lib_path !=
"None":
75 os.environ[
'LD_LIBRARY_PATH'] = ld_lib_path
77 print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
82 print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
98 if self.options_.negate
is False and self.options_.force
is False:
105 return (self.options_, self.args_)
109 print 'PREPARING JOBS ======== '
112 if listOfDirNames
is None:
113 for value
in listOfValues:
116 for value, name
in zip( listOfValues, listOfDirNames):
118 print "list of jobs:"
119 pp = pprint.PrettyPrinter(indent=4)
131 outputDir = self.options_.outputDir
134 today = datetime.today()
135 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M%S")
136 print 'output directory not specified, using %s' % outputDir
142 if not self.options_.force:
143 while input !=
'y' and input !=
'n':
144 input = raw_input(
'The directory ' + self.
outputDir_ +
' exists. Are you sure you want to continue? its contents will be overwritten [y/n]' )
154 '''Prepare a job for a given value.
156 calls PrepareJobUser, which should be overloaded by the user.
158 print 'PrepareJob : %s' % value
161 dname =
'Job_{value}'.
format( value=value )
165 self.listOfJobs_.append( jobDir )
169 '''Hook allowing user to define how one of his jobs should be prepared.'''
170 print '\to be customized'
174 '''Submit all jobs. Possibly wait between each job'''
176 if(self.options_.negate):
177 print '*NOT* SUBMITTING JOBS - exit '
179 print 'SUBMITTING JOBS ======== '
186 if mode==
"IC" and self.options_.parametric:
188 jobDirsFile = os.path.join(self.
outputDir_,
"jobDirectories.txt")
189 with open(jobDirsFile,
'w')
as f:
193 readLine =
"readarray JOBDIR < "+jobDirsFile+
"\n"
195 submitScript = os.path.join(self.
outputDir_,
"parametricSubmit.sh")
196 with open(submitScript,
'w')
as batchScript:
197 batchScript.write(
"#!/bin/bash\n")
198 batchScript.write(
"#$ -e /dev/null -o /dev/null \n")
200 batchScript.write(readLine)
201 batchScript.write(
"cd ${JOBDIR[${SGE_TASK_ID}-1]}\n")
202 batchScript.write(
"./batchScript.sh > BATCH_outputLog.txt 2> BATCH_errorLog.txt" )
205 splitBatchOptions = self.options_.batch.split()
206 if '-q' in splitBatchOptions: queue = splitBatchOptions[splitBatchOptions.index(
'-q')+1]
207 else: queue =
"hepshort.q"
209 os.system(
"qsub -q "+queue+
" -t 1-"+str(len(self.
listOfJobs_))+
" "+submitScript)
217 print 'processing ', jobDir
222 print 'waiting %s seconds...' % waitingTimeInSec
223 time.sleep( waitingTimeInSec )
227 '''Hook for job submission.'''
228 print 'submitting (to be customized): ', jobDir
229 os.system( self.options_.batch )
234 if batchScript ==
'':
237 if( os.path.isfile(batchScript)==
False ):
238 print 'file ',batchScript,
' does not exist'
242 ifile = open(batchScript)
244 print 'cannot open input %s' % batchScript
248 p = re.compile(
"\s*cp.*\$jobdir\s+(\S+)$");
251 if os.path.isdir( os.path.expandvars(m.group(1)) ):
252 print 'output directory ', m.group(1),
'already exists!'
256 if self.options_.negate==
False:
257 os.mkdir( os.path.expandvars(m.group(1)) )
259 print 'not making dir', self.options_.negate
264 mkdir =
'mkdir -p %s' % dirname
265 ret = os.system( mkdir )
267 print 'please remove or rename directory: ', dirname
272 '''Returns "LXPLUS", "PSI", "LOCAL", or None,
274 "LXPLUS" : batch command is bsub, and logged on lxplus
275 "PSI" : batch command is qsub, and logged to t3uiXX
276 "IC" : batch command is qsub, and logged to hep.ph.ic.ac.uk
277 "LOCAL" : batch command is nohup.
278 In all other cases, a CmsBatchException is raised
281 hostName = os.environ[
'HOSTNAME']
282 onLxplus = hostName.startswith(
'lxplus')
283 onPSI = hostName.startswith(
't3ui' )
284 onPISA = re.match(
'.*gridui.*',hostName)
or re.match(
'.*faiwn.*',hostName)
285 onPADOVA = ( hostName.startswith(
't2-ui')
and re.match(
'.*pd.infn.*',hostName) )
or ( hostName.startswith(
't2-cld')
and re.match(
'.*lnl.infn.*',hostName) )
286 onIC =
'hep.ph.ic.ac.uk' in hostName
287 batchCmd = batch.split()[0]
289 if batchCmd ==
'bsub':
290 if not (onLxplus
or onPISA
or onPADOVA) :
291 err =
'Cannot run %s on %s' % (batchCmd, hostName)
292 raise ValueError( err )
294 print 'running on LSF pisa : %s from %s' % (batchCmd, hostName)
297 print 'running on LSF padova: %s from %s' % (batchCmd, hostName)
300 print 'running on LSF lxplus: %s from %s' % (batchCmd, hostName)
302 elif batchCmd ==
"qsub":
308 print 'running on IC : %s from %s' % (batchCmd, hostName)
313 print 'running on SGE : %s from %s' % (batchCmd, hostName)
316 elif batchCmd ==
'nohup' or batchCmd ==
'./batchScript.sh':
317 print 'running locally : %s on %s' % (batchCmd, hostName)
320 err =
'unknown batch command: X%sX' % batchCmd
321 raise ValueError( err )
static std::string join(char **cmd)
if(conf.exists("allCellsPositionCalc"))