11 from PhysicsTools.HeppyCore.utils.batchmanager
import BatchManager
13 from PhysicsTools.HeppyCore.framework.heppy_loop
import split
16 '''prepare the LSF version of the batch script, to run on LSF'''
17 script =
"""#!/bin/bash
24 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
25 source $VO_CMS_SW_DIR/cmsset_default.sh
30 # ulimit -v 3000000 # NO
31 echo 'copying job dir to worker'
32 eval `scram runtime -sh`
35 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
38 #echo 'sending the job directory back'
39 #echo cp -r Loop/* $LS_SUBCWD
45 '''prepare the LSF version of the batch script, to run on LSF'''
46 script =
"""#!/bin/bash
50 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
51 source $VO_CMS_SW_DIR/cmsset_default.sh
56 # ulimit -v 3000000 # NO
57 echo 'copying job dir to worker'
59 eval `scramv1 runtime -sh`
60 #eval `scramv1 ru -sh`
62 # eval `scramv1 ru -sh`
66 echo `find . -type d | grep /`
68 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
71 #echo 'sending the job directory back'
72 #echo cp -r Loop/* $LS_SUBCWD
77 '''prepare the LSF version of the batch script, to run on LSF'''
79 dirCopy =
"""echo 'sending the logs back' # will send also root files if copy failed
80 rm Loop/cmsswPreProcessing.root
81 cp -r Loop/* $LS_SUBCWD
83 echo 'ERROR: problem copying job directory back'
85 echo 'job directory copy succeeded'
90 elif remoteDir.startswith(
"root://eoscms.cern.ch//eos/cms/store/"):
91 cpCmd=
"""echo 'sending root files to remote dir'
92 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
93 for f in Loop/*/tree*.root
95 rm Loop/cmsswPreProcessing.root
96 ff=`echo $f | cut -d/ -f2`
97 ff="${{ff}}_`basename $f | cut -d . -f 1`"
100 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
101 source $VO_CMS_SW_DIR/cmsset_default.sh
102 for try in `seq 1 3`; do
103 echo "Stageout try $try"
104 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
105 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
106 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
107 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
108 if [ $? -ne 0 ]; then
109 echo "ERROR: remote copy failed for file $ff"
112 echo "remote copy succeeded"
113 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
114 locsize=$(cat `pwd`/$f | wc -c)
115 ok=$(($remsize==$locsize))
116 if [ $ok -ne 1 ]; then
117 echo "Problem with copy (file sizes don't match), will retry in 30s"
123 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
127 cp -r Loop/* $LS_SUBCWD
128 if [ $? -ne 0 ]; then
129 echo 'ERROR: problem copying job directory back'
131 echo 'job directory copy succeeded'
134 idx = jobDir[jobDir.find(
"_Chunk")+6:].strip(
"/")
if '_Chunk' in jobDir
else 'all',
135 srm = (
""+remoteDir+jobDir[ jobDir.rfind(
"/") : (jobDir.find(
"_Chunk")
if '_Chunk' in jobDir
else len(jobDir)) ]).
replace(
"root://eoscms.cern.ch/",
"")
138 print "chosen location not supported yet: ", remoteDir
139 print 'path must start with /store/'
142 script =
"""#!/bin/bash
147 # ulimit -v 3000000 # NO
148 echo 'copying job dir to worker'
150 eval `scramv1 ru -sh`
152 # eval `scramv1 ru -sh`
156 cd `find . -type d | grep /`
158 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
167 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
169 cmssw_release = os.environ[
'CMSSW_BASE']
170 VO_CMS_SW_DIR =
"/swshare/cms"
173 cpCmd=
"""echo 'sending the job directory back'
174 rm Loop/cmsswPreProcessing.root
175 cp -r Loop/* $SUBMISIONDIR"""
176 elif remoteDir.startswith(
"/pnfs/psi.ch"):
177 cpCmd=
"""echo 'sending root files to remote dir'
178 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
179 for f in Loop/mt2*.root
181 ff=`basename $f | cut -d . -f 1`
182 #d=`echo $f | cut -d / -f 2`
184 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
185 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
186 if [ $? -ne 0 ]; then
187 echo "ERROR: remote copy failed for file $ff"
189 echo "remote copy succeeded"
193 rm Loop/cmsswPreProcessing.root
194 cp -r Loop/* $SUBMISIONDIR""".
format(idx=index, srm=
'srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind(
"/"):jobDir.find(
"_Chunk")])
196 print "remote directory not supported yet: ", remoteDir
197 print 'path must start with "/pnfs/psi.ch"'
201 script =
"""#!/bin/bash
203 ##### MONITORING/DEBUG INFORMATION ###############################
204 DATE_START=`date +%s`
205 echo "Job started at " `date`
207 ################################################################
208 ## QUEUEING SYSTEM SETTINGS:
218 echo "######## Environment Variables ##########"
220 echo "################################################################"
221 TOPWORKDIR=/scratch/`whoami`
222 JOBDIR=sgejob-$JOB_ID
223 WORKDIR=$TOPWORKDIR/$JOBDIR
225 if test -e "$WORKDIR"; then
226 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
230 if test ! -d "$WORKDIR"; then
231 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
235 #source $VO_CMS_SW_DIR/cmsset_default.sh
236 source {vo}/cmsset_default.sh
237 export SCRAM_ARCH=slc6_amd64_gcc481
240 shopt -s expand_aliases
243 cp -rf $SUBMISIONDIR .
245 cd `find . -type d | grep /`
247 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
248 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
251 ###########################################################################
253 RUNTIME=$((DATE_END-DATE_START))
254 echo "################################################################"
255 echo "Job finished at " `date`
256 echo "Wallclock running time: $RUNTIME s"
258 """.
format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
263 '''prepare a IC version of the batch script'''
266 cmssw_release = os.environ[
'CMSSW_BASE']
267 script =
"""#!/bin/bash
268 export X509_USER_PROXY=/home/hep/$USER/myproxy
269 source /vols/cms/grid/setup.sh
272 eval `scramv1 ru -sh`
275 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
277 echo 'sending the job directory back'
278 mv Loop/* ./ && rm -r Loop
279 """.
format(jobdir = jobDir,cmssw = cmssw_release)
283 '''prepare a local version of the batch script, to run using nohup'''
285 script =
"""#!/bin/bash
287 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
289 echo 'sending the job directory back'
296 '''Batch manager specific to cmsRun processes.'''
299 '''Prepare one job. This function is called by the base class.'''
301 print components[value]
304 scriptFileName = jobDir+
'/batchScript.sh'
305 scriptFile = open(scriptFileName,
'w')
306 storeDir = self.remoteOutputDir_.replace(
'/castor/cern.ch/cms',
'')
307 mode = self.RunningMode(options.batch)
311 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
312 elif mode ==
'LOCAL':
314 elif mode ==
'PISA' :
316 elif mode ==
'PADOVA' :
321 os.system(
'chmod +x %s' % scriptFileName)
323 shutil.copyfile(cfgFileName, jobDir+
'/pycfg.py')
326 cfgFile = open(jobDir+
'/config.pck',
'w')
327 pickle.dump( components[value] , cfgFile )
330 if hasattr(self,
"heppyOptions_"):
331 optjsonfile = open(jobDir+
'/options.json',
'w')
332 optjsonfile.write(json.dumps(self.heppyOptions_))
335 if __name__ ==
'__main__':
337 batchManager.parser_.usage=
"""
338 %prog [options] <cfgFile>
340 Run Colin's python analysis system on the batch.
341 Job splitting is determined by your configuration file.
344 options, args = batchManager.ParseOptions()
346 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
347 for opt
in options.extraOptions:
349 (key,val) = opt.split(
"=",1)
350 _heppyGlobalOptions[key] = val
352 _heppyGlobalOptions[opt] =
True
353 batchManager.heppyOptions_=_heppyGlobalOptions
355 cfgFileName = args[0]
357 handle = open(cfgFileName,
'r')
359 cfo = imp.load_source(
"pycfg", cfgFileName, handle)
363 components =
split( [comp
for comp
in config.components
if len(comp.files)>0] )
364 listOfValues = range(0, len(components))
365 listOfNames = [comp.name
for comp
in components]
367 batchManager.PrepareJobs( listOfValues, listOfNames )
369 batchManager.SubmitJobs( waitingTime )