3 from __future__
import print_function
4 from __future__
import absolute_import
5 from datetime
import datetime
6 from optparse
import OptionParser
14 from .
import eostools
as castortools
18 This class manages batch jobs 34 self.parser_.add_option(
"-o",
"--output-dir", dest=
"outputDir",
35 help=
"Name of the local output directory for your jobs. This directory will be created automatically.",
37 self.parser_.add_option(
"-r",
"--remote-copy", dest=
"remoteCopy",
38 help=
"remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
40 self.parser_.add_option(
"-f",
"--force", action=
"store_true",
41 dest=
"force", default=
False,
42 help=
"Don't ask any questions, just over-write")
44 self.parser_.add_option(
"-n",
"--negate", action=
"store_true",
45 dest=
"negate", default=
False,
46 help=
"create jobs, but does not submit the jobs.")
47 self.parser_.add_option(
"-b",
"--batch", dest=
"batch",
48 help=
"batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
49 default=
"bsub -q 8nh < ./batchScript.sh")
50 self.parser_.add_option(
"--option",
55 help=
"Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
58 (self.options_,self.args_) = self.parser_.parse_args()
59 if self.options_.remoteCopy ==
None:
66 if self.remoteOutputDir_.startswith(
"/pnfs/psi.ch"):
67 ld_lib_path = os.environ.get(
'LD_LIBRARY_PATH')
68 if ld_lib_path !=
"None":
69 os.environ[
'LD_LIBRARY_PATH'] =
"/usr/lib64/:"+ld_lib_path
71 outputDir = self.options_.outputDir
73 today = datetime.today()
74 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M")
77 if ld_lib_path !=
"None":
78 os.environ[
'LD_LIBRARY_PATH'] = ld_lib_path
80 print(
"remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI")
85 print(
'When providing an output directory, you must give its LFN, starting by /store. You gave:')
101 if self.options_.negate
is False and self.options_.force
is False:
109 return (self.options_, self.args_)
113 print(
'PREPARING JOBS ======== ')
116 if listOfDirNames
is None:
117 for value
in listOfValues:
120 for value, name
in zip( listOfValues, listOfDirNames):
122 print(
"list of jobs:")
123 pp = pprint.PrettyPrinter(indent=4)
135 outputDir = self.options_.outputDir
138 today = datetime.today()
139 outputDir =
'OutCmsBatch_%s' % today.strftime(
"%d%h%y_%H%M%S")
140 print(
'output directory not specified, using %s' % outputDir)
146 if not self.options_.force:
147 while input !=
'y' and input !=
'n':
148 input = raw_input(
'The directory ' + self.
outputDir_ +
' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
158 '''Prepare a job for a given value. 160 calls PrepareJobUser, which should be overloaded by the user. 162 print(
'PrepareJob : %s' % value)
165 dname =
'Job_{value}'.
format( value=value )
169 self.listOfJobs_.append( jobDir )
173 '''Hook allowing user to define how one of his jobs should be prepared.''' 174 print(
'\to be customized')
178 '''Submit all jobs. Possibly wait between each job''' 180 if(self.options_.negate):
181 print(
'*NOT* SUBMITTING JOBS - exit ')
183 print(
'SUBMITTING JOBS ======== ')
187 print(
'processing ', jobDir)
192 print(
'waiting %s seconds...' % waitingTimeInSec)
193 time.sleep( waitingTimeInSec )
197 '''Hook for job submission.''' 198 print(
'submitting (to be customized): ', jobDir)
199 os.system( self.options_.batch )
203 '''Hook for array job submission.''' 204 print(
'Submitting array with %s jobs' % numbOfJobs)
208 if batchScript ==
'':
211 if( os.path.isfile(batchScript)==
False ):
212 print(
'file ',batchScript,
' does not exist')
216 ifile = open(batchScript)
218 print(
'cannot open input %s' % batchScript)
222 p = re.compile(
"\s*cp.*\$jobdir\s+(\S+)$");
225 if os.path.isdir( os.path.expandvars(m.group(1)) ):
226 print(
'output directory ', m.group(1),
'already exists!')
230 if self.options_.negate==
False:
231 os.mkdir( os.path.expandvars(m.group(1)) )
233 print(
'not making dir', self.options_.negate)
238 mkdir =
'mkdir -p %s' % dirname
239 ret = os.system( mkdir )
241 print(
'please remove or rename directory: ', dirname)
247 '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None, 249 "LXPLUS" : batch command is bsub, and logged on lxplus 250 "PSI" : batch command is qsub, and logged to t3uiXX 251 "NAF" : batch command is qsub, and logged on naf 252 "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk 253 "LOCAL" : batch command is nohup. 255 In all other cases, a CmsBatchException is raised 258 hostName = os.environ[
'HOSTNAME']
260 onLxplus = hostName.startswith(
'lxplus')
261 onPSI = hostName.startswith(
't3ui')
262 onNAF = hostName.startswith(
'naf')
264 batchCmd = batch.split()[0]
266 if batchCmd ==
'bsub':
268 err =
'Cannot run %s on %s' % (batchCmd, hostName)
269 raise ValueError( err )
271 print(
'running on LSF : %s from %s' % (batchCmd, hostName))
274 elif batchCmd ==
"qsub":
276 print(
'running on SGE : %s from %s' % (batchCmd, hostName))
279 print(
'running on NAF : %s from %s' % (batchCmd, hostName))
282 print(
'running on IC : %s from %s' % (batchCmd, hostName))
285 err =
'Cannot run %s on %s' % (batchCmd, hostName)
286 raise ValueError( err )
288 elif batchCmd ==
'nohup' or batchCmd ==
'./batchScript.sh':
289 print(
'running locally : %s on %s' % (batchCmd, hostName))
292 err =
'unknown batch command: X%sX' % batchCmd
293 raise ValueError( err )
def PrepareJob(self, value, dirname=None)
S & print(S &os, JobReport::InputFile const &f)
def SubmitJobArray(self, numbOfJobs=1)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def PrepareJobs(self, listOfValues, listOfDirNames=None)
static std::string join(char **cmd)
def CheckBatchScript(self, batchScript)
def ManageOutputDir(self)
def SubmitJobs(self, waitingTimeInSec=0)
def SubmitJob(self, jobDir)
def RunningMode(self, batch)
def PrepareJobUser(self, value)