3 from __future__
import print_function
12 from PhysicsTools.HeppyCore.utils.batchmanager
import BatchManager
14 from PhysicsTools.HeppyCore.framework.heppy_loop
import split
17 '''prepare the LSF version of the batch script, to run on LSF''' 18 script =
"""#!/bin/bash 25 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 26 source $VO_CMS_SW_DIR/cmsset_default.sh 31 # ulimit -v 3000000 # NO 32 echo 'copying job dir to worker' 33 eval `scram runtime -sh` 36 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output 39 #echo 'sending the job directory back' 40 #echo cp -r Loop/* $LS_SUBCWD 46 '''prepare the LSF version of the batch script, to run on LSF''' 47 script =
"""#!/bin/bash 51 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 52 source $VO_CMS_SW_DIR/cmsset_default.sh 57 # ulimit -v 3000000 # NO 58 echo 'copying job dir to worker' 60 eval `scramv1 runtime -sh` 61 #eval `scramv1 ru -sh` 63 # eval `scramv1 ru -sh` 67 echo `find . -type d | grep /` 69 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output 72 #echo 'sending the job directory back' 73 #echo cp -r Loop/* $LS_SUBCWD 78 '''prepare the LSF version of the batch script, to run on LSF''' 80 dirCopy =
"""echo 'sending the logs back' # will send also root files if copy failed 81 rm Loop/cmsswPreProcessing.root 82 cp -r Loop/* $LS_SUBCWD 84 echo 'ERROR: problem copying job directory back' 86 echo 'job directory copy succeeded' 91 elif remoteDir.startswith(
"root://eoscms.cern.ch//eos/cms/store/"):
92 cpCmd=
"""echo 'sending root files to remote dir' 93 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH # 94 for f in Loop/*/tree*.root 96 rm Loop/cmsswPreProcessing.root 97 ff=`echo $f | cut -d/ -f2` 98 ff="${{ff}}_`basename $f | cut -d . -f 1`" 101 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 102 source $VO_CMS_SW_DIR/cmsset_default.sh 103 for try in `seq 1 3`; do 104 echo "Stageout try $try" 105 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}" 106 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm} 107 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root" 108 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root 109 if [ $? -ne 0 ]; then 110 echo "ERROR: remote copy failed for file $ff" 113 echo "remote copy succeeded" 114 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3) 115 locsize=$(cat `pwd`/$f | wc -c) 116 ok=$(($remsize==$locsize)) 117 if [ $ok -ne 1 ]; then 118 echo "Problem with copy (file sizes don't match), will retry in 30s" 124 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url 128 cp -r Loop/* $LS_SUBCWD 129 if [ $? -ne 0 ]; then 130 echo 'ERROR: problem copying job directory back' 132 echo 'job directory copy succeeded' 135 idx = jobDir[jobDir.find(
"_Chunk")+6:].
strip(
"/")
if '_Chunk' in jobDir
else 'all',
136 srm = (
""+remoteDir+jobDir[ jobDir.rfind(
"/") : (jobDir.find(
"_Chunk")
if '_Chunk' in jobDir
else len(jobDir)) ]).
replace(
"root://eoscms.cern.ch/",
"")
139 print(
"chosen location not supported yet: ", remoteDir)
140 print(
'path must start with /store/')
143 script =
"""#!/bin/bash 148 # ulimit -v 3000000 # NO 149 echo 'copying job dir to worker' 151 eval `scramv1 ru -sh` 153 # eval `scramv1 ru -sh` 157 cd `find . -type d | grep /` 159 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 168 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system''' 170 cmssw_release = os.environ[
'CMSSW_BASE']
171 VO_CMS_SW_DIR =
"/swshare/cms" 174 cpCmd=
"""echo 'sending the job directory back' 175 rm Loop/cmsswPreProcessing.root 176 cp -r Loop/* $SUBMISIONDIR""" 177 elif remoteDir.startswith(
"/pnfs/psi.ch"):
178 cpCmd=
"""echo 'sending root files to remote dir' 179 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools 180 for f in Loop/mt2*.root 182 ff=`basename $f | cut -d . -f 1` 183 #d=`echo $f | cut -d / -f 2` 185 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root" 186 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root 187 if [ $? -ne 0 ]; then 188 echo "ERROR: remote copy failed for file $ff" 190 echo "remote copy succeeded" 194 rm Loop/cmsswPreProcessing.root 195 cp -r Loop/* $SUBMISIONDIR""".
format(idx=index, srm=
'srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind(
"/"):jobDir.find(
"_Chunk")])
197 print(
"remote directory not supported yet: ", remoteDir)
198 print(
'path must start with "/pnfs/psi.ch"')
202 script =
"""#!/bin/bash 204 ##### MONITORING/DEBUG INFORMATION ############################### 205 DATE_START=`date +%s` 206 echo "Job started at " `date` 208 ################################################################ 209 ## QUEUEING SYSTEM SETTINGS: 219 echo "######## Environment Variables ##########" 221 echo "################################################################" 222 TOPWORKDIR=/scratch/`whoami` 223 JOBDIR=sgejob-$JOB_ID 224 WORKDIR=$TOPWORKDIR/$JOBDIR 226 if test -e "$WORKDIR"; then 227 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2 231 if test ! -d "$WORKDIR"; then 232 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2 236 #source $VO_CMS_SW_DIR/cmsset_default.sh 237 source {vo}/cmsset_default.sh 238 export SCRAM_ARCH=slc6_amd64_gcc481 241 shopt -s expand_aliases 244 cp -rf $SUBMISIONDIR . 246 cd `find . -type d | grep /` 248 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 249 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck 252 ########################################################################### 254 RUNTIME=$((DATE_END-DATE_START)) 255 echo "################################################################" 256 echo "Job finished at " `date` 257 echo "Wallclock running time: $RUNTIME s" 259 """.
format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
264 '''prepare a IC version of the batch script''' 267 cmssw_release = os.environ[
'CMSSW_BASE']
268 script =
"""#!/bin/bash 269 export X509_USER_PROXY=/home/hep/$USER/myproxy 270 source /vols/cms/grid/setup.sh 273 eval `scramv1 ru -sh` 276 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 278 echo 'sending the job directory back' 279 mv Loop/* ./ && rm -r Loop 280 """.
format(jobdir = jobDir,cmssw = cmssw_release)
284 '''prepare a local version of the batch script, to run using nohup''' 286 script =
"""#!/bin/bash 288 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 290 echo 'sending the job directory back' 297 '''Batch manager specific to cmsRun processes.''' 300 '''Prepare one job. This function is called by the base class.''' 302 print(components[value])
305 scriptFileName = jobDir+
'/batchScript.sh' 306 scriptFile = open(scriptFileName,
'w')
307 storeDir = self.remoteOutputDir_.replace(
'/castor/cern.ch/cms',
'')
308 mode = self.RunningMode(options.batch)
312 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
313 elif mode ==
'LOCAL':
315 elif mode ==
'PISA' :
317 elif mode ==
'PADOVA' :
322 os.system(
'chmod +x %s' % scriptFileName)
324 shutil.copyfile(cfgFileName, jobDir+
'/pycfg.py')
327 cfgFile = open(jobDir+
'/config.pck',
'w')
328 pickle.dump( components[value] , cfgFile )
331 if hasattr(self,
"heppyOptions_"):
332 optjsonfile = open(jobDir+
'/options.json',
'w')
333 optjsonfile.write(json.dumps(self.heppyOptions_))
336 if __name__ ==
'__main__':
338 batchManager.parser_.usage=
""" 339 %prog [options] <cfgFile> 341 Run Colin's python analysis system on the batch. 342 Job splitting is determined by your configuration file. 345 options, args = batchManager.ParseOptions()
347 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
348 for opt
in options.extraOptions:
350 (key,val) = opt.split(
"=",1)
351 _heppyGlobalOptions[key] = val
353 _heppyGlobalOptions[opt] =
True 354 batchManager.heppyOptions_=_heppyGlobalOptions
356 cfgFileName = args[0]
358 handle = open(cfgFileName,
'r') 360 cfo = imp.load_source(
"pycfg", cfgFileName, handle)
364 components =
split( [comp
for comp
in config.components
if len(comp.files)>0] )
365 listOfValues = range(0, len(components))
366 listOfNames = [comp.name
for comp
in components]
368 batchManager.PrepareJobs( listOfValues, listOfNames )
370 batchManager.SubmitJobs( waitingTime )
def batchScriptPADOVA(index, jobDir='./')
def batchScriptIC(jobDir)
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
def batchScriptPSI(index, jobDir, remoteDir='')
S & print(S &os, JobReport::InputFile const &f)
def PrepareJobUser(self, jobDir, value)
def batchScriptLocal(remoteDir, index)
def batchScriptCERN(jobDir, remoteDir='')