3 from datetime
import datetime
4 from optparse
import OptionParser
12 import eostools
as castortools
16 This class manages batch jobs 32 self.parser_.add_option(
"-o",
"--output-dir", dest=
"outputDir",
33 help=
"Name of the local output directory for your jobs. This directory will be created automatically.",
35 self.parser_.add_option(
"-r",
"--remote-copy", dest=
"remoteCopy",
36 help=
"remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
38 self.parser_.add_option(
"-f",
"--force", action=
"store_true",
39 dest=
"force", default=
False,
40 help=
"Don't ask any questions, just over-write")
42 self.parser_.add_option(
"-n",
"--negate", action=
"store_true",
43 dest=
"negate", default=
False,
44 help=
"create jobs, but does not submit the jobs.")
45 self.parser_.add_option(
"-b",
"--batch", dest=
"batch",
46 help=
"batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
47 default=
"bsub -q 8nh < ./batchScript.sh")
48 self.parser_.add_option(
"--option",
53 help=
"Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
56 (self.options_,self.args_) = self.parser_.parse_args()
57 if self.options_.remoteCopy ==
None:
64 if self.remoteOutputDir_.startswith(
"/pnfs/psi.ch"):
65 ld_lib_path = os.environ.get(
'LD_LIBRARY_PATH')
66 if ld_lib_path !=
"None":
67 os.environ[
'LD_LIBRARY_PATH'] =
"/usr/lib64/:"+ld_lib_path
69 outputDir = self.options_.outputDir
71 today = datetime.today()
72 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M")
75 if ld_lib_path !=
"None":
76 os.environ[
'LD_LIBRARY_PATH'] = ld_lib_path
78 print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI" 83 print 'When providing an output directory, you must give its LFN, starting by /store. You gave:' 99 if self.options_.negate
is False and self.options_.force
is False:
107 return (self.options_, self.args_)
111 print 'PREPARING JOBS ======== ' 114 if listOfDirNames
is None:
115 for value
in listOfValues:
118 for value, name
in zip( listOfValues, listOfDirNames):
120 print "list of jobs:" 121 pp = pprint.PrettyPrinter(indent=4)
133 outputDir = self.options_.outputDir
136 today = datetime.today()
137 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M%S")
138 print 'output directory not specified, using %s' % outputDir
144 if not self.options_.force:
145 while input !=
'y' and input !=
'n':
146 input = raw_input(
'The directory ' + self.
outputDir_ +
' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
156 '''Prepare a job for a given value. 158 calls PrepareJobUser, which should be overloaded by the user. 160 print 'PrepareJob : %s' % value
163 dname =
'Job_{value}'.
format( value=value )
167 self.listOfJobs_.append( jobDir )
171 '''Hook allowing user to define how one of his jobs should be prepared.''' 172 print '\to be customized' 176 '''Submit all jobs. Possibly wait between each job''' 178 if(self.options_.negate):
179 print '*NOT* SUBMITTING JOBS - exit ' 181 print 'SUBMITTING JOBS ======== ' 185 print 'processing ', jobDir
190 print 'waiting %s seconds...' % waitingTimeInSec
191 time.sleep( waitingTimeInSec )
195 '''Hook for job submission.''' 196 print 'submitting (to be customized): ', jobDir
197 os.system( self.options_.batch )
201 '''Hook for array job submission.''' 202 print 'Submitting array with %s jobs' % numbOfJobs
206 if batchScript ==
'':
209 if( os.path.isfile(batchScript)==
False ):
210 print 'file ',batchScript,
' does not exist' 214 ifile = open(batchScript)
216 print 'cannot open input %s' % batchScript
220 p = re.compile(
"\s*cp.*\$jobdir\s+(\S+)$");
223 if os.path.isdir( os.path.expandvars(m.group(1)) ):
224 print 'output directory ', m.group(1),
'already exists!' 228 if self.options_.negate==
False:
229 os.mkdir( os.path.expandvars(m.group(1)) )
231 print 'not making dir', self.options_.negate
236 mkdir =
'mkdir -p %s' % dirname
237 ret = os.system( mkdir )
239 print 'please remove or rename directory: ', dirname
245 '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None, 247 "LXPLUS" : batch command is bsub, and logged on lxplus 248 "PSI" : batch command is qsub, and logged to t3uiXX 249 "NAF" : batch command is qsub, and logged on naf 250 "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk 251 "LOCAL" : batch command is nohup. 253 In all other cases, a CmsBatchException is raised 256 hostName = os.environ[
'HOSTNAME']
258 onLxplus = hostName.startswith(
'lxplus')
259 onPSI = hostName.startswith(
't3ui')
260 onNAF = hostName.startswith(
'naf')
262 batchCmd = batch.split()[0]
264 if batchCmd ==
'bsub':
266 err =
'Cannot run %s on %s' % (batchCmd, hostName)
267 raise ValueError( err )
269 print 'running on LSF : %s from %s' % (batchCmd, hostName)
272 elif batchCmd ==
"qsub":
274 print 'running on SGE : %s from %s' % (batchCmd, hostName)
277 print 'running on NAF : %s from %s' % (batchCmd, hostName)
280 print 'running on IC : %s from %s' % (batchCmd, hostName)
283 err =
'Cannot run %s on %s' % (batchCmd, hostName)
284 raise ValueError( err )
286 elif batchCmd ==
'nohup' or batchCmd ==
'./batchScript.sh':
287 print 'running locally : %s on %s' % (batchCmd, hostName)
290 err =
'unknown batch command: X%sX' % batchCmd
291 raise ValueError( err )
def PrepareJob(self, value, dirname=None)
def SubmitJobArray(self, numbOfJobs=1)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def PrepareJobs(self, listOfValues, listOfDirNames=None)
static std::string join(char **cmd)
def CheckBatchScript(self, batchScript)
def ManageOutputDir(self)
def SubmitJobs(self, waitingTimeInSec=0)
def SubmitJob(self, jobDir)
def RunningMode(self, batch)
def PrepareJobUser(self, value)