CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 import sys
4 import imp
5 import copy
6 import os
7 import shutil
8 import pickle
9 import math
10 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
11 
12 from PhysicsTools.HeppyCore.framework.heppy import split
13 
14 
15 def batchScriptPISA( index, remoteDir=''):
16  '''prepare the LSF version of the batch script, to run on LSF'''
17  script = """#!/bin/bash
18 #BSUB -q cms
19 echo 'PWD:'
20 pwd
21 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
22 source $VO_CMS_SW_DIR/cmsset_default.sh
23 echo 'environment:'
24 echo
25 env > local.env
26 env
27 # ulimit -v 3000000 # NO
28 echo 'copying job dir to worker'
29 ###cd $CMSSW_BASE/src
30 eval `scramv1 runtime -sh`
31 #eval `scramv1 ru -sh`
32 # cd $LS_SUBCWD
33 # eval `scramv1 ru -sh`
34 ##cd -
35 ##cp -rf $LS_SUBCWD .
36 ls
37 echo `find . -type d | grep /`
38 echo 'running'
39 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck >& local.output
40 exit $?
41 #echo
42 #echo 'sending the job directory back'
43 #echo cp -r Loop/* $LS_SUBCWD
44 """
45  return script
46 
47 
48 def batchScriptCERN( index, remoteDir=''):
49  '''prepare the LSF version of the batch script, to run on LSF'''
50  script = """#!/bin/bash
51 #BSUB -q 8nm
52 echo 'environment:'
53 echo
54 env
55 # ulimit -v 3000000 # NO
56 echo 'copying job dir to worker'
57 cd $CMSSW_BASE/src
58 eval `scramv1 ru -sh`
59 # cd $LS_SUBCWD
60 # eval `scramv1 ru -sh`
61 cd -
62 cp -rf $LS_SUBCWD .
63 ls
64 cd `find . -type d | grep /`
65 echo 'running'
66 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck
67 echo
68 echo 'sending the job directory back'
69 cp -r Loop/* $LS_SUBCWD
70 """
71  return script
72 
73 
74 def batchScriptPSI( index, jobDir, remoteDir=''):
75  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
76 
77  cmssw_release = os.environ['CMSSW_BASE']
78  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
79 
80  if remoteDir=='':
81  cpCmd="""echo 'sending the job directory back'
82 cp -r Loop/* $SUBMISIONDIR"""
83  elif remoteDir.startswith("/pnfs/psi.ch"):
84  cpCmd="""echo 'sending root files to remote dir'
85 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
86 for f in Loop/treeProducerSusyFullHad/*.root
87 do
88 echo $f
89 ff=`basename $f | cut -d . -f 1`
90 echo $ff
91 gfal-mkdir {srm}
92 echo "gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$file.root {srm}/${{ff}}_{idx}.root"
93 gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$ff.root {srm}/${{ff}}_{idx}.root
94 done
95 rm Loop/treeProducerSusyFullHad/*.root
96 echo 'sending the logs back'
97 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
98  else:
99  print "remote directory not supported yet: ", remoteDir
100  print 'path must start with "/pnfs/psi.ch"'
101  sys.exit(1)
102 
103 
104  script = """#!/bin/bash
105 shopt expand_aliases
106 ##### MONITORING/DEBUG INFORMATION ###############################
107 DATE_START=`date +%s`
108 echo "Job started at " `date`
109 cat <<EOF
110 ################################################################
111 ## QUEUEING SYSTEM SETTINGS:
112 HOME=$HOME
113 USER=$USER
114 JOB_ID=$JOB_ID
115 JOB_NAME=$JOB_NAME
116 HOSTNAME=$HOSTNAME
117 TASK_ID=$TASK_ID
118 QUEUE=$QUEUE
119 
120 EOF
121 echo "######## Environment Variables ##########"
122 env
123 echo "################################################################"
124 TOPWORKDIR=/scratch/`whoami`
125 JOBDIR=sgejob-$JOB_ID
126 WORKDIR=$TOPWORKDIR/$JOBDIR
127 SUBMISIONDIR={jdir}
128 if test -e "$WORKDIR"; then
129  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
130  exit 1
131 fi
132 mkdir -p $WORKDIR
133 if test ! -d "$WORKDIR"; then
134  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
135  exit 1
136 fi
137 
138 #source $VO_CMS_SW_DIR/cmsset_default.sh
139 source {vo}/cmsset_default.sh
140 export SCRAM_ARCH=slc6_amd64_gcc481
141 #cd $CMSSW_BASE/src
142 cd {cmssw}/src
143 shopt -s expand_aliases
144 cmsenv
145 cd $WORKDIR
146 cp -rf $SUBMISIONDIR .
147 ls
148 cd `find . -type d | grep /`
149 echo 'running'
150 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
151 python {cmssw}/src/CMGTools/RootTools/python/fwlite/looper.py pycfg.py config.pck
152 echo
153 {copy}
154 ###########################################################################
155 DATE_END=`date +%s`
156 RUNTIME=$((DATE_END-DATE_START))
157 echo "################################################################"
158 echo "Job finished at " `date`
159 echo "Wallclock running time: $RUNTIME s"
160 exit 0
161 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
162 
163  return script
164 
165 
166 def batchScriptLocal( remoteDir, index ):
167  '''prepare a local version of the batch script, to run using nohup'''
168 
169  script = """#!/bin/bash
170 echo 'running'
171 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck echo
172 echo 'sending the job directory back'
173 mv Loop/* ./
174 """
175  return script
176 
177 
178 
179 class MyBatchManager( BatchManager ):
180  '''Batch manager specific to cmsRun processes.'''
181 
182  def PrepareJobUser(self, jobDir, value ):
183  '''Prepare one job. This function is called by the base class.'''
184  print value
185  print components[value]
186 
187  #prepare the batch script
188  scriptFileName = jobDir+'/batchScript.sh'
189  scriptFile = open(scriptFileName,'w')
190  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
191  mode = self.RunningMode(options.batch)
192  if mode == 'LXPLUS':
193  scriptFile.write( batchScriptCERN( storeDir, value) ) # watch out arguments are swapped (although not used)
194  elif mode == 'PSI':
195  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
196  elif mode == 'LOCAL':
197  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
198  elif mode == 'PISA' :
199  scriptFile.write( batchScriptPISA( storeDir, value) )
200  scriptFile.close()
201  os.system('chmod +x %s' % scriptFileName)
202 
203  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
204 # jobConfig = copy.deepcopy(config)
205 # jobConfig.components = [ components[value] ]
206  cfgFile = open(jobDir+'/config.pck','w')
207  pickle.dump( components[value] , cfgFile )
208  # pickle.dump( cfo, cfgFile )
209  cfgFile.close()
210 
211 
212 if __name__ == '__main__':
213  batchManager = MyBatchManager()
214  batchManager.parser_.usage="""
215  %prog [options] <cfgFile>
216 
217  Run Colin's python analysis system on the batch.
218  Job splitting is determined by your configuration file.
219  """
220 
221  options, args = batchManager.ParseOptions()
222 
223  cfgFileName = args[0]
224 
225  handle = open(cfgFileName, 'r')
226  cfo = imp.load_source("pycfg", cfgFileName, handle)
227  config = cfo.config
228  handle.close()
229 
230  components = split( [comp for comp in config.components if len(comp.files)>0] )
231  listOfValues = range(0, len(components))
232  listOfNames = [comp.name for comp in components]
233 
234  batchManager.PrepareJobs( listOfValues, listOfNames )
235  waitingTime = 0.1
236  batchManager.SubmitJobs( waitingTime )
237 
def batchScriptCERN
Definition: heppy_batch.py:48
def batchScriptPISA
Definition: heppy_batch.py:15
def batchScriptLocal
Definition: heppy_batch.py:166
def batchScriptPSI
Definition: heppy_batch.py:74
double split
Definition: MVATrainer.cc:139