3 from __future__
import print_function
4 from builtins
import range
13 from PhysicsTools.HeppyCore.utils.batchmanager
import BatchManager
15 from PhysicsTools.HeppyCore.framework.heppy_loop
import split
18 '''prepare the LSF version of the batch script, to run on LSF''' 19 script =
"""#!/bin/bash 26 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 27 source $VO_CMS_SW_DIR/cmsset_default.sh 32 # ulimit -v 3000000 # NO 33 echo 'copying job dir to worker' 34 eval `scram runtime -sh` 37 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output 40 #echo 'sending the job directory back' 41 #echo cp -r Loop/* $LS_SUBCWD 47 '''prepare the LSF version of the batch script, to run on LSF''' 48 script =
"""#!/bin/bash 52 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 53 source $VO_CMS_SW_DIR/cmsset_default.sh 58 # ulimit -v 3000000 # NO 59 echo 'copying job dir to worker' 61 eval `scramv1 runtime -sh` 62 #eval `scramv1 ru -sh` 64 # eval `scramv1 ru -sh` 68 echo `find . -type d | grep /` 70 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output 73 #echo 'sending the job directory back' 74 #echo cp -r Loop/* $LS_SUBCWD 79 '''prepare the LSF version of the batch script, to run on LSF''' 81 dirCopy =
"""echo 'sending the logs back' # will send also root files if copy failed 82 rm Loop/cmsswPreProcessing.root 83 cp -r Loop/* $LS_SUBCWD 85 echo 'ERROR: problem copying job directory back' 87 echo 'job directory copy succeeded' 92 elif remoteDir.startswith(
"root://eoscms.cern.ch//eos/cms/store/"):
93 cpCmd=
"""echo 'sending root files to remote dir' 94 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH # 95 for f in Loop/*/tree*.root 97 rm Loop/cmsswPreProcessing.root 98 ff=`echo $f | cut -d/ -f2` 99 ff="${{ff}}_`basename $f | cut -d . -f 1`" 102 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch 103 source $VO_CMS_SW_DIR/cmsset_default.sh 104 for try in `seq 1 3`; do 105 echo "Stageout try $try" 106 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}" 107 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm} 108 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root" 109 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root 110 if [ $? -ne 0 ]; then 111 echo "ERROR: remote copy failed for file $ff" 114 echo "remote copy succeeded" 115 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3) 116 locsize=$(cat `pwd`/$f | wc -c) 117 ok=$(($remsize==$locsize)) 118 if [ $ok -ne 1 ]; then 119 echo "Problem with copy (file sizes don't match), will retry in 30s" 125 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url 129 cp -r Loop/* $LS_SUBCWD 130 if [ $? -ne 0 ]; then 131 echo 'ERROR: problem copying job directory back' 133 echo 'job directory copy succeeded' 136 idx = jobDir[jobDir.find(
"_Chunk")+6:].
strip(
"/")
if '_Chunk' in jobDir
else 'all',
137 srm = (
""+remoteDir+jobDir[ jobDir.rfind(
"/") : (jobDir.find(
"_Chunk")
if '_Chunk' in jobDir
else len(jobDir)) ]).
replace(
"root://eoscms.cern.ch/",
"")
140 print(
"chosen location not supported yet: ", remoteDir)
141 print(
'path must start with /store/')
144 script =
"""#!/bin/bash 149 # ulimit -v 3000000 # NO 150 echo 'copying job dir to worker' 152 eval `scramv1 ru -sh` 154 # eval `scramv1 ru -sh` 158 cd `find . -type d | grep /` 160 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 169 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system''' 171 cmssw_release = os.environ[
'CMSSW_BASE']
172 VO_CMS_SW_DIR =
"/swshare/cms" 175 cpCmd=
"""echo 'sending the job directory back' 176 rm Loop/cmsswPreProcessing.root 177 cp -r Loop/* $SUBMISIONDIR""" 178 elif remoteDir.startswith(
"/pnfs/psi.ch"):
179 cpCmd=
"""echo 'sending root files to remote dir' 180 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools 181 for f in Loop/mt2*.root 183 ff=`basename $f | cut -d . -f 1` 184 #d=`echo $f | cut -d / -f 2` 186 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root" 187 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root 188 if [ $? -ne 0 ]; then 189 echo "ERROR: remote copy failed for file $ff" 191 echo "remote copy succeeded" 195 rm Loop/cmsswPreProcessing.root 196 cp -r Loop/* $SUBMISIONDIR""".
format(idx=index, srm=
'srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind(
"/"):jobDir.find(
"_Chunk")])
198 print(
"remote directory not supported yet: ", remoteDir)
199 print(
'path must start with "/pnfs/psi.ch"')
203 script =
"""#!/bin/bash 205 ##### MONITORING/DEBUG INFORMATION ############################### 206 DATE_START=`date +%s` 207 echo "Job started at " `date` 209 ################################################################ 210 ## QUEUEING SYSTEM SETTINGS: 220 echo "######## Environment Variables ##########" 222 echo "################################################################" 223 TOPWORKDIR=/scratch/`whoami` 224 JOBDIR=sgejob-$JOB_ID 225 WORKDIR=$TOPWORKDIR/$JOBDIR 227 if test -e "$WORKDIR"; then 228 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2 232 if test ! -d "$WORKDIR"; then 233 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2 237 #source $VO_CMS_SW_DIR/cmsset_default.sh 238 source {vo}/cmsset_default.sh 239 export SCRAM_ARCH=slc6_amd64_gcc481 242 shopt -s expand_aliases 245 cp -rf $SUBMISIONDIR . 247 cd `find . -type d | grep /` 249 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 250 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck 253 ########################################################################### 255 RUNTIME=$((DATE_END-DATE_START)) 256 echo "################################################################" 257 echo "Job finished at " `date` 258 echo "Wallclock running time: $RUNTIME s" 260 """.
format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
265 '''prepare a IC version of the batch script''' 268 cmssw_release = os.environ[
'CMSSW_BASE']
269 script =
"""#!/bin/bash 270 export X509_USER_PROXY=/home/hep/$USER/myproxy 271 source /vols/cms/grid/setup.sh 274 eval `scramv1 ru -sh` 277 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 279 echo 'sending the job directory back' 280 mv Loop/* ./ && rm -r Loop 281 """.
format(jobdir = jobDir,cmssw = cmssw_release)
285 '''prepare a local version of the batch script, to run using nohup''' 287 script =
"""#!/bin/bash 289 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json 291 echo 'sending the job directory back' 298 '''Batch manager specific to cmsRun processes.''' 301 '''Prepare one job. This function is called by the base class.''' 303 print(components[value])
306 scriptFileName = jobDir+
'/batchScript.sh' 307 scriptFile = open(scriptFileName,
'w')
308 storeDir = self.remoteOutputDir_.
replace(
'/castor/cern.ch/cms',
'')
309 mode = self.RunningMode(options.batch)
313 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
314 elif mode ==
'LOCAL':
316 elif mode ==
'PISA' :
318 elif mode ==
'PADOVA' :
323 os.system(
'chmod +x %s' % scriptFileName)
325 shutil.copyfile(cfgFileName, jobDir+
'/pycfg.py')
328 cfgFile = open(jobDir+
'/config.pck',
'w')
329 pickle.dump( components[value] , cfgFile )
332 if hasattr(self,
"heppyOptions_"):
333 optjsonfile = open(jobDir+
'/options.json',
'w')
334 optjsonfile.write(json.dumps(self.heppyOptions_))
337 if __name__ ==
'__main__':
339 batchManager.parser_.usage=
""" 340 %prog [options] <cfgFile> 342 Run Colin's python analysis system on the batch. 343 Job splitting is determined by your configuration file. 346 options, args = batchManager.ParseOptions()
348 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
349 for opt
in options.extraOptions:
351 (key,val) = opt.split(
"=",1)
352 _heppyGlobalOptions[key] = val
354 _heppyGlobalOptions[opt] =
True 355 batchManager.heppyOptions_=_heppyGlobalOptions
357 cfgFileName = args[0]
359 handle = open(cfgFileName,
'r') 361 cfo = imp.load_source(
"pycfg", cfgFileName, handle)
365 components =
split( [comp
for comp
in config.components
if len(comp.files)>0] )
366 listOfValues = list(
range(0, len(components)))
367 listOfNames = [comp.name
for comp
in components]
369 batchManager.PrepareJobs( listOfValues, listOfNames )
371 batchManager.SubmitJobs( waitingTime )
def batchScriptPADOVA(index, jobDir='./')
def batchScriptIC(jobDir)
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
def batchScriptPSI(index, jobDir, remoteDir='')
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def PrepareJobUser(self, jobDir, value)
def split(sequence, size)
def batchScriptLocal(remoteDir, index)
def batchScriptCERN(jobDir, remoteDir='')