3 from __future__
import print_function
4 from datetime
import datetime
5 from optparse
import OptionParser
13 import eostools
as castortools
17 This class manages batch jobs 33 self.parser_.add_option(
"-o",
"--output-dir", dest=
"outputDir",
34 help=
"Name of the local output directory for your jobs. This directory will be created automatically.",
36 self.parser_.add_option(
"-r",
"--remote-copy", dest=
"remoteCopy",
37 help=
"remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
39 self.parser_.add_option(
"-f",
"--force", action=
"store_true",
40 dest=
"force", default=
False,
41 help=
"Don't ask any questions, just over-write")
43 self.parser_.add_option(
"-n",
"--negate", action=
"store_true",
44 dest=
"negate", default=
False,
45 help=
"create jobs, but does not submit the jobs.")
46 self.parser_.add_option(
"-b",
"--batch", dest=
"batch",
47 help=
"batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
48 default=
"bsub -q 8nh < ./batchScript.sh")
49 self.parser_.add_option(
"--option",
54 help=
"Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
57 (self.options_,self.args_) = self.parser_.parse_args()
58 if self.options_.remoteCopy ==
None:
65 if self.remoteOutputDir_.startswith(
"/pnfs/psi.ch"):
66 ld_lib_path = os.environ.get(
'LD_LIBRARY_PATH')
67 if ld_lib_path !=
"None":
68 os.environ[
'LD_LIBRARY_PATH'] =
"/usr/lib64/:"+ld_lib_path
70 outputDir = self.options_.outputDir
72 today = datetime.today()
73 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M")
76 if ld_lib_path !=
"None":
77 os.environ[
'LD_LIBRARY_PATH'] = ld_lib_path
79 print(
"remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI")
84 print(
'When providing an output directory, you must give its LFN, starting by /store. You gave:')
100 if self.options_.negate
is False and self.options_.force
is False:
108 return (self.options_, self.args_)
112 print(
'PREPARING JOBS ======== ')
115 if listOfDirNames
is None:
116 for value
in listOfValues:
119 for value, name
in zip( listOfValues, listOfDirNames):
121 print(
"list of jobs:")
122 pp = pprint.PrettyPrinter(indent=4)
134 outputDir = self.options_.outputDir
137 today = datetime.today()
138 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M%S")
139 print(
'output directory not specified, using %s' % outputDir)
145 if not self.options_.force:
146 while input !=
'y' and input !=
'n':
147 input = raw_input(
'The directory ' + self.
outputDir_ +
' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
157 '''Prepare a job for a given value. 159 calls PrepareJobUser, which should be overloaded by the user. 161 print(
'PrepareJob : %s' % value)
164 dname =
'Job_{value}'.
format( value=value )
168 self.listOfJobs_.append( jobDir )
172 '''Hook allowing user to define how one of his jobs should be prepared.''' 173 print(
'\to be customized')
177 '''Submit all jobs. Possibly wait between each job''' 179 if(self.options_.negate):
180 print(
'*NOT* SUBMITTING JOBS - exit ')
182 print(
'SUBMITTING JOBS ======== ')
186 print(
'processing ', jobDir)
191 print(
'waiting %s seconds...' % waitingTimeInSec)
192 time.sleep( waitingTimeInSec )
196 '''Hook for job submission.''' 197 print(
'submitting (to be customized): ', jobDir)
198 os.system( self.options_.batch )
202 '''Hook for array job submission.''' 203 print(
'Submitting array with %s jobs' % numbOfJobs)
207 if batchScript ==
'':
210 if( os.path.isfile(batchScript)==
False ):
211 print(
'file ',batchScript,
' does not exist')
215 ifile = open(batchScript)
217 print(
'cannot open input %s' % batchScript)
221 p = re.compile(
"\s*cp.*\$jobdir\s+(\S+)$");
224 if os.path.isdir( os.path.expandvars(m.group(1)) ):
225 print(
'output directory ', m.group(1),
'already exists!')
229 if self.options_.negate==
False:
230 os.mkdir( os.path.expandvars(m.group(1)) )
232 print(
'not making dir', self.options_.negate)
237 mkdir =
'mkdir -p %s' % dirname
238 ret = os.system( mkdir )
240 print(
'please remove or rename directory: ', dirname)
246 '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None, 248 "LXPLUS" : batch command is bsub, and logged on lxplus 249 "PSI" : batch command is qsub, and logged to t3uiXX 250 "NAF" : batch command is qsub, and logged on naf 251 "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk 252 "LOCAL" : batch command is nohup. 254 In all other cases, a CmsBatchException is raised 257 hostName = os.environ[
'HOSTNAME']
259 onLxplus = hostName.startswith(
'lxplus')
260 onPSI = hostName.startswith(
't3ui')
261 onNAF = hostName.startswith(
'naf')
263 batchCmd = batch.split()[0]
265 if batchCmd ==
'bsub':
267 err =
'Cannot run %s on %s' % (batchCmd, hostName)
268 raise ValueError( err )
270 print(
'running on LSF : %s from %s' % (batchCmd, hostName))
273 elif batchCmd ==
"qsub":
275 print(
'running on SGE : %s from %s' % (batchCmd, hostName))
278 print(
'running on NAF : %s from %s' % (batchCmd, hostName))
281 print(
'running on IC : %s from %s' % (batchCmd, hostName))
284 err =
'Cannot run %s on %s' % (batchCmd, hostName)
285 raise ValueError( err )
287 elif batchCmd ==
'nohup' or batchCmd ==
'./batchScript.sh':
288 print(
'running locally : %s on %s' % (batchCmd, hostName))
291 err =
'unknown batch command: X%sX' % batchCmd
292 raise ValueError( err )
def PrepareJob(self, value, dirname=None)
S & print(S &os, JobReport::InputFile const &f)
def SubmitJobArray(self, numbOfJobs=1)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def PrepareJobs(self, listOfValues, listOfDirNames=None)
static std::string join(char **cmd)
def CheckBatchScript(self, batchScript)
def ManageOutputDir(self)
def SubmitJobs(self, waitingTimeInSec=0)
def SubmitJob(self, jobDir)
def RunningMode(self, batch)
def PrepareJobUser(self, value)