11 from PhysicsTools.HeppyCore.utils.batchmanager
import BatchManager
13 from PhysicsTools.HeppyCore.framework.heppy_loop
import split
16 '''prepare the LSF version of the batch script, to run on LSF''' 17 script =
"""#!/bin/bash 24 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 25 source $VO_CMS_SW_DIR/cmsset_default.sh 30 # ulimit -v 3000000 # NO 31 echo 'copying job dir to worker' 32 eval `scram runtime -sh` 35 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output 38 #echo 'sending the job directory back' 39 #echo cp -r Loop/* $LS_SUBCWD 45 '''prepare the LSF version of the batch script, to run on LSF''' 46 script =
"""#!/bin/bash 50 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 51 source $VO_CMS_SW_DIR/cmsset_default.sh 56 # ulimit -v 3000000 # NO 57 echo 'copying job dir to worker' 59 eval `scramv1 runtime -sh` 60 #eval `scramv1 ru -sh` 62 # eval `scramv1 ru -sh` 66 echo `find . -type d | grep /` 68 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output 71 #echo 'sending the job directory back' 72 #echo cp -r Loop/* $LS_SUBCWD 77 '''prepare the LSF version of the batch script, to run on LSF''' 79 dirCopy =
"""echo 'sending the logs back' # will send also root files if copy failed 80 rm Loop/cmsswPreProcessing.root 81 cp -r Loop/* $LS_SUBCWD 83 echo 'ERROR: problem copying job directory back' 85 echo 'job directory copy succeeded' 90 elif remoteDir.startswith(
"root://eoscms.cern.ch//eos/cms/store/"):
91 cpCmd=
"""echo 'sending root files to remote dir' 92 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH # 93 for f in Loop/*/tree*.root 95 rm Loop/cmsswPreProcessing.root 96 ff=`echo $f | cut -d/ -f2` 97 ff="${{ff}}_`basename $f | cut -d . -f 1`" 100 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 101 source $VO_CMS_SW_DIR/cmsset_default.sh 102 for try in `seq 1 3`; do 103 echo "Stageout try $try" 104 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}" 105 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm} 106 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root" 107 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root 108 if [ $? -ne 0 ]; then 109 echo "ERROR: remote copy failed for file $ff" 112 echo "remote copy succeeded" 113 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3) 114 locsize=$(cat `pwd`/$f | wc -c) 115 ok=$(($remsize==$locsize)) 116 if [ $ok -ne 1 ]; then 117 echo "Problem with copy (file sizes don't match), will retry in 30s" 123 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url 127 cp -r Loop/* $LS_SUBCWD 128 if [ $? -ne 0 ]; then 129 echo 'ERROR: problem copying job directory back' 131 echo 'job directory copy succeeded' 134 idx = jobDir[jobDir.find(
"_Chunk")+6:].
strip(
"/")
if '_Chunk' in jobDir
else 'all',
135 srm = (
""+remoteDir+jobDir[ jobDir.rfind(
"/") : (jobDir.find(
"_Chunk")
if '_Chunk' in jobDir
else len(jobDir)) ]).
replace(
"root://eoscms.cern.ch/",
"")
138 print "chosen location not supported yet: ", remoteDir
139 print 'path must start with /store/' 142 script =
"""#!/bin/bash 147 # ulimit -v 3000000 # NO 148 echo 'copying job dir to worker' 150 eval `scramv1 ru -sh` 152 # eval `scramv1 ru -sh` 156 cd `find . -type d | grep /` 158 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 167 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system''' 169 cmssw_release = os.environ[
'CMSSW_BASE']
170 VO_CMS_SW_DIR =
"/swshare/cms" 173 cpCmd=
"""echo 'sending the job directory back' 174 rm Loop/cmsswPreProcessing.root 175 cp -r Loop/* $SUBMISIONDIR""" 176 elif remoteDir.startswith(
"/pnfs/psi.ch"):
177 cpCmd=
"""echo 'sending root files to remote dir' 178 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools 179 for f in Loop/mt2*.root 181 ff=`basename $f | cut -d . -f 1` 182 #d=`echo $f | cut -d / -f 2` 184 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root" 185 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root 186 if [ $? -ne 0 ]; then 187 echo "ERROR: remote copy failed for file $ff" 189 echo "remote copy succeeded" 193 rm Loop/cmsswPreProcessing.root 194 cp -r Loop/* $SUBMISIONDIR""".
format(idx=index, srm=
'srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind(
"/"):jobDir.find(
"_Chunk")])
196 print "remote directory not supported yet: ", remoteDir
197 print 'path must start with "/pnfs/psi.ch"' 201 script =
"""#!/bin/bash 203 ##### MONITORING/DEBUG INFORMATION ############################### 204 DATE_START=`date +%s` 205 echo "Job started at " `date` 207 ################################################################ 208 ## QUEUEING SYSTEM SETTINGS: 218 echo "######## Environment Variables ##########" 220 echo "################################################################" 221 TOPWORKDIR=/scratch/`whoami` 222 JOBDIR=sgejob-$JOB_ID 223 WORKDIR=$TOPWORKDIR/$JOBDIR 225 if test -e "$WORKDIR"; then 226 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2 230 if test ! -d "$WORKDIR"; then 231 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2 235 #source $VO_CMS_SW_DIR/cmsset_default.sh 236 source {vo}/cmsset_default.sh 237 export SCRAM_ARCH=slc6_amd64_gcc481 240 shopt -s expand_aliases 243 cp -rf $SUBMISIONDIR . 245 cd `find . -type d | grep /` 247 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 248 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck 251 ########################################################################### 253 RUNTIME=$((DATE_END-DATE_START)) 254 echo "################################################################" 255 echo "Job finished at " `date` 256 echo "Wallclock running time: $RUNTIME s" 258 """.
format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
263 '''prepare a IC version of the batch script''' 266 cmssw_release = os.environ[
'CMSSW_BASE']
267 script =
"""#!/bin/bash 268 export X509_USER_PROXY=/home/hep/$USER/myproxy 269 source /vols/cms/grid/setup.sh 272 eval `scramv1 ru -sh` 275 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 277 echo 'sending the job directory back' 278 mv Loop/* ./ && rm -r Loop 279 """.
format(jobdir = jobDir,cmssw = cmssw_release)
283 '''prepare a local version of the batch script, to run using nohup''' 285 script =
"""#!/bin/bash 287 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 289 echo 'sending the job directory back' 296 '''Batch manager specific to cmsRun processes.''' 299 '''Prepare one job. This function is called by the base class.''' 301 print components[value]
304 scriptFileName = jobDir+
'/batchScript.sh' 305 scriptFile = open(scriptFileName,
'w')
306 storeDir = self.remoteOutputDir_.replace(
'/castor/cern.ch/cms',
'')
307 mode = self.RunningMode(options.batch)
311 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
312 elif mode ==
'LOCAL':
314 elif mode ==
'PISA' :
316 elif mode ==
'PADOVA' :
321 os.system(
'chmod +x %s' % scriptFileName)
323 shutil.copyfile(cfgFileName, jobDir+
'/pycfg.py')
326 cfgFile = open(jobDir+
'/config.pck',
'w')
327 pickle.dump( components[value] , cfgFile )
330 if hasattr(self,
"heppyOptions_"):
331 optjsonfile = open(jobDir+
'/options.json',
'w')
332 optjsonfile.write(json.dumps(self.heppyOptions_))
335 if __name__ ==
'__main__':
337 batchManager.parser_.usage=
""" 338 %prog [options] <cfgFile> 340 Run Colin's python analysis system on the batch. 341 Job splitting is determined by your configuration file. 344 options, args = batchManager.ParseOptions()
346 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
347 for opt
in options.extraOptions:
349 (key,val) = opt.split(
"=",1)
350 _heppyGlobalOptions[key] = val
352 _heppyGlobalOptions[opt] =
True 353 batchManager.heppyOptions_=_heppyGlobalOptions
355 cfgFileName = args[0]
357 handle = open(cfgFileName,
'r') 359 cfo = imp.load_source(
"pycfg", cfgFileName, handle)
363 components =
split( [comp
for comp
in config.components
if len(comp.files)>0] )
364 listOfValues = range(0, len(components))
365 listOfNames = [comp.name
for comp
in components]
367 batchManager.PrepareJobs( listOfValues, listOfNames )
369 batchManager.SubmitJobs( waitingTime )
def batchScriptPADOVA(index, jobDir='./')
def batchScriptIC(jobDir)
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
def batchScriptPSI(index, jobDir, remoteDir='')
def PrepareJobUser(self, jobDir, value)
def batchScriptLocal(remoteDir, index)
def batchScriptCERN(jobDir, remoteDir='')