3 from __future__
import print_function
4 from builtins
import range
13 from PhysicsTools.HeppyCore.utils.batchmanager
import BatchManager
15 from PhysicsTools.HeppyCore.framework.heppy_loop
import split
18 '''prepare the LSF version of the batch script, to run on LSF'''
19 script =
"""#!/bin/bash
26 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
27 source $VO_CMS_SW_DIR/cmsset_default.sh
32 # ulimit -v 3000000 # NO
33 echo 'copying job dir to worker'
34 eval `scram runtime -sh`
37 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
40 #echo 'sending the job directory back'
41 #echo cp -r Loop/* $LS_SUBCWD
47 '''prepare the LSF version of the batch script, to run on LSF'''
48 script =
"""#!/bin/bash
52 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
53 source $VO_CMS_SW_DIR/cmsset_default.sh
58 # ulimit -v 3000000 # NO
59 echo 'copying job dir to worker'
61 eval `scramv1 runtime -sh`
62 #eval `scramv1 ru -sh`
64 # eval `scramv1 ru -sh`
68 echo `find . -type d | grep /`
70 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
73 #echo 'sending the job directory back'
74 #echo cp -r Loop/* $LS_SUBCWD
79 '''prepare the LSF version of the batch script, to run on LSF'''
81 dirCopy =
"""echo 'sending the logs back' # will send also root files if copy failed
82 rm Loop/cmsswPreProcessing.root
83 cp -r Loop/* $LS_SUBCWD
85 echo 'ERROR: problem copying job directory back'
87 echo 'job directory copy succeeded'
92 elif remoteDir.startswith(
"root://eoscms.cern.ch//eos/cms/store/"):
93 cpCmd=
"""echo 'sending root files to remote dir'
94 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
95 for f in Loop/*/tree*.root
97 rm Loop/cmsswPreProcessing.root
98 ff=`echo $f | cut -d/ -f2`
99 ff="${{ff}}_`basename $f | cut -d . -f 1`"
102 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
103 source $VO_CMS_SW_DIR/cmsset_default.sh
104 for try in `seq 1 3`; do
105 echo "Stageout try $try"
106 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
107 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
108 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
109 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
110 if [ $? -ne 0 ]; then
111 echo "ERROR: remote copy failed for file $ff"
114 echo "remote copy succeeded"
115 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
116 locsize=$(cat `pwd`/$f | wc -c)
117 ok=$(($remsize==$locsize))
118 if [ $ok -ne 1 ]; then
119 echo "Problem with copy (file sizes don't match), will retry in 30s"
125 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
129 cp -r Loop/* $LS_SUBCWD
130 if [ $? -ne 0 ]; then
131 echo 'ERROR: problem copying job directory back'
133 echo 'job directory copy succeeded'
136 idx = jobDir[jobDir.find(
"_Chunk")+6:].
strip(
"/")
if '_Chunk' in jobDir
else 'all',
137 srm = (
""+remoteDir+jobDir[ jobDir.rfind(
"/") : (jobDir.find(
"_Chunk")
if '_Chunk' in jobDir
else len(jobDir)) ]).
replace(
"root://eoscms.cern.ch/",
"")
140 print(
"chosen location not supported yet: ", remoteDir)
141 print(
'path must start with /store/')
144 script =
"""#!/bin/bash
149 # ulimit -v 3000000 # NO
150 echo 'copying job dir to worker'
152 eval `scramv1 ru -sh`
154 # eval `scramv1 ru -sh`
158 cd `find . -type d | grep /`
160 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
169 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
171 cmssw_release = os.environ[
'CMSSW_BASE']
172 VO_CMS_SW_DIR =
"/swshare/cms"
175 cpCmd=
"""echo 'sending the job directory back'
176 rm Loop/cmsswPreProcessing.root
177 cp -r Loop/* $SUBMISIONDIR"""
178 elif remoteDir.startswith(
"/pnfs/psi.ch"):
179 cpCmd=
"""echo 'sending root files to remote dir'
180 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
181 for f in Loop/mt2*.root
183 ff=`basename $f | cut -d . -f 1`
184 #d=`echo $f | cut -d / -f 2`
186 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
187 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
188 if [ $? -ne 0 ]; then
189 echo "ERROR: remote copy failed for file $ff"
191 echo "remote copy succeeded"
195 rm Loop/cmsswPreProcessing.root
196 cp -r Loop/* $SUBMISIONDIR""".
format(idx=index, srm=
'srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind(
"/"):jobDir.find(
"_Chunk")])
198 print(
"remote directory not supported yet: ", remoteDir)
199 print(
'path must start with "/pnfs/psi.ch"')
203 script =
"""#!/bin/bash
205 ##### MONITORING/DEBUG INFORMATION ###############################
206 DATE_START=`date +%s`
207 echo "Job started at " `date`
209 ################################################################
210 ## QUEUEING SYSTEM SETTINGS:
220 echo "######## Environment Variables ##########"
222 echo "################################################################"
223 TOPWORKDIR=/scratch/`whoami`
224 JOBDIR=sgejob-$JOB_ID
225 WORKDIR=$TOPWORKDIR/$JOBDIR
227 if test -e "$WORKDIR"; then
228 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
232 if test ! -d "$WORKDIR"; then
233 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
237 #source $VO_CMS_SW_DIR/cmsset_default.sh
238 source {vo}/cmsset_default.sh
239 export SCRAM_ARCH=slc6_amd64_gcc481
242 shopt -s expand_aliases
245 cp -rf $SUBMISIONDIR .
247 cd `find . -type d | grep /`
249 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
250 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
253 ###########################################################################
255 RUNTIME=$((DATE_END-DATE_START))
256 echo "################################################################"
257 echo "Job finished at " `date`
258 echo "Wallclock running time: $RUNTIME s"
260 """.
format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
265 '''prepare a IC version of the batch script'''
268 cmssw_release = os.environ[
'CMSSW_BASE']
269 script =
"""#!/bin/bash
270 export X509_USER_PROXY=/home/hep/$USER/myproxy
271 source /vols/cms/grid/setup.sh
274 eval `scramv1 ru -sh`
277 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
279 echo 'sending the job directory back'
280 mv Loop/* ./ && rm -r Loop
281 """.
format(jobdir = jobDir,cmssw = cmssw_release)
285 '''prepare a local version of the batch script, to run using nohup'''
287 script =
"""#!/bin/bash
289 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
291 echo 'sending the job directory back'
298 '''Batch manager specific to cmsRun processes.'''
301 '''Prepare one job. This function is called by the base class.'''
303 print(components[value])
306 scriptFileName = jobDir+
'/batchScript.sh'
307 scriptFile = open(scriptFileName,
'w')
308 storeDir = self.remoteOutputDir_.
replace(
'/castor/cern.ch/cms',
'')
309 mode = self.RunningMode(options.batch)
313 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
314 elif mode ==
'LOCAL':
316 elif mode ==
'PISA' :
318 elif mode ==
'PADOVA' :
323 os.system(
'chmod +x %s' % scriptFileName)
325 shutil.copyfile(cfgFileName, jobDir+
'/pycfg.py')
328 cfgFile = open(jobDir+
'/config.pck',
'w')
329 pickle.dump( components[value] , cfgFile )
332 if hasattr(self,
"heppyOptions_"):
333 optjsonfile = open(jobDir+
'/options.json',
'w')
334 optjsonfile.write(json.dumps(self.heppyOptions_))
337 if __name__ ==
'__main__':
339 batchManager.parser_.usage=
"""
340 %prog [options] <cfgFile>
342 Run Colin's python analysis system on the batch.
343 Job splitting is determined by your configuration file.
346 options, args = batchManager.ParseOptions()
348 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
349 for opt
in options.extraOptions:
351 (key,val) = opt.split(
"=",1)
352 _heppyGlobalOptions[key] = val
354 _heppyGlobalOptions[opt] =
True
355 batchManager.heppyOptions_=_heppyGlobalOptions
357 cfgFileName = args[0]
359 handle = open(cfgFileName,
'r')
361 cfo = imp.load_source(
"pycfg", cfgFileName, handle)
365 components =
split( [comp
for comp
in config.components
if len(comp.files)>0] )
366 listOfValues = list(
range(0, len(components)))
367 listOfNames = [comp.name
for comp
in components]
369 batchManager.PrepareJobs( listOfValues, listOfNames )
371 batchManager.SubmitJobs( waitingTime )