CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 import sys
4 import imp
5 import copy
6 import os
7 import shutil
8 import pickle
9 import math
10 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
11 
12 from PhysicsTools.HeppyCore.framework.heppy import split
13 
14 def batchScriptPADOVA( index, jobDir='./'):
15  '''prepare the LSF version of the batch script, to run on LSF'''
16  script = """#!/bin/bash
17 #BSUB -q local
18 #BSUB -J test
19 #BSUB -o test.log
20 cd {jdir}
21 echo 'PWD:'
22 pwd
23 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
24 source $VO_CMS_SW_DIR/cmsset_default.sh
25 echo 'environment:'
26 echo
27 env > local.env
28 env
29 # ulimit -v 3000000 # NO
30 echo 'copying job dir to worker'
31 eval `scram runtime -sh`
32 ls
33 echo 'running'
34 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck >& local.output
35 exit $?
36 #echo
37 #echo 'sending the job directory back'
38 #echo cp -r Loop/* $LS_SUBCWD
39 """.format(jdir=jobDir)
40 
41  return script
42 
43 def batchScriptPISA( index, remoteDir=''):
44  '''prepare the LSF version of the batch script, to run on LSF'''
45  script = """#!/bin/bash
46 #BSUB -q cms
47 echo 'PWD:'
48 pwd
49 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
50 source $VO_CMS_SW_DIR/cmsset_default.sh
51 echo 'environment:'
52 echo
53 env > local.env
54 env
55 # ulimit -v 3000000 # NO
56 echo 'copying job dir to worker'
57 ###cd $CMSSW_BASE/src
58 eval `scramv1 runtime -sh`
59 #eval `scramv1 ru -sh`
60 # cd $LS_SUBCWD
61 # eval `scramv1 ru -sh`
62 ##cd -
63 ##cp -rf $LS_SUBCWD .
64 ls
65 echo `find . -type d | grep /`
66 echo 'running'
67 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck >& local.output
68 exit $?
69 #echo
70 #echo 'sending the job directory back'
71 #echo cp -r Loop/* $LS_SUBCWD
72 """
73  return script
74 
75 def batchScriptCERN( jobDir, remoteDir=''):
76  '''prepare the LSF version of the batch script, to run on LSF'''
77 
78  dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
79 cp -r Loop/* $LS_SUBCWD
80 if [ $? -ne 0 ]; then
81  echo 'ERROR: problem copying job directory back'
82 else
83  echo 'job directory copy succeeded'
84 fi"""
85  if remoteDir=='':
86  cpCmd=dirCopy
87  elif remoteDir.startswith("/pnfs/psi.ch"):
88  cpCmd="""echo 'sending root files to remote dir'
89 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH # Fabio's workaround to fix gfal-tools with CMSSW
90 for f in Loop/mt2*.root
91 do
92  ff=`basename $f | cut -d . -f 1`
93  #d=`echo $f | cut -d / -f 2`
94  gfal-mkdir {srm}
95  echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
96  gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
97  if [ $? -ne 0 ]; then
98  echo "ERROR: remote copy failed for file $ff"
99  else
100  echo "remote copy succeeded"
101  rm Loop/$ff.root
102  fi
103 done
104 #fi
105 """.format(idx=jobDir[jobDir.find("_Chunk")+6:].strip("/"), srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")]) + dirCopy
106  else:
107  print "chosen location not supported yet: ", remoteDir
108  print 'path must start with "/pnfs/psi.ch"'
109  sys.exit(1)
110 
111  script = """#!/bin/bash
112 #BSUB -q 8nm
113 echo 'environment:'
114 echo
115 env | sort
116 # ulimit -v 3000000 # NO
117 echo 'copying job dir to worker'
118 cd $CMSSW_BASE/src
119 eval `scramv1 ru -sh`
120 # cd $LS_SUBCWD
121 # eval `scramv1 ru -sh`
122 cd -
123 cp -rf $LS_SUBCWD .
124 ls
125 cd `find . -type d | grep /`
126 echo 'running'
127 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck
128 echo
129 {copy}
130 """.format(copy=cpCmd)
131 
132  return script
133 
134 
135 def batchScriptPSI( index, jobDir, remoteDir=''):
136  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
137 
138  cmssw_release = os.environ['CMSSW_BASE']
139  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
140 
141  if remoteDir=='':
142  cpCmd="""echo 'sending the job directory back'
143 cp -r Loop/* $SUBMISIONDIR"""
144  elif remoteDir.startswith("/pnfs/psi.ch"):
145  cpCmd="""echo 'sending root files to remote dir'
146 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
147 for f in Loop/treeProducerSusyFullHad/*.root
148 do
149 echo $f
150 ff=`basename $f | cut -d . -f 1`
151 echo $ff
152 gfal-mkdir {srm}
153 echo "gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$file.root {srm}/${{ff}}_{idx}.root"
154 gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$ff.root {srm}/${{ff}}_{idx}.root
155 done
156 rm Loop/treeProducerSusyFullHad/*.root
157 echo 'sending the logs back'
158 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
159  else:
160  print "remote directory not supported yet: ", remoteDir
161  print 'path must start with "/pnfs/psi.ch"'
162  sys.exit(1)
163 
164 
165  script = """#!/bin/bash
166 shopt expand_aliases
167 ##### MONITORING/DEBUG INFORMATION ###############################
168 DATE_START=`date +%s`
169 echo "Job started at " `date`
170 cat <<EOF
171 ################################################################
172 ## QUEUEING SYSTEM SETTINGS:
173 HOME=$HOME
174 USER=$USER
175 JOB_ID=$JOB_ID
176 JOB_NAME=$JOB_NAME
177 HOSTNAME=$HOSTNAME
178 TASK_ID=$TASK_ID
179 QUEUE=$QUEUE
180 
181 EOF
182 echo "######## Environment Variables ##########"
183 env
184 echo "################################################################"
185 TOPWORKDIR=/scratch/`whoami`
186 JOBDIR=sgejob-$JOB_ID
187 WORKDIR=$TOPWORKDIR/$JOBDIR
188 SUBMISIONDIR={jdir}
189 if test -e "$WORKDIR"; then
190  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
191  exit 1
192 fi
193 mkdir -p $WORKDIR
194 if test ! -d "$WORKDIR"; then
195  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
196  exit 1
197 fi
198 
199 #source $VO_CMS_SW_DIR/cmsset_default.sh
200 source {vo}/cmsset_default.sh
201 export SCRAM_ARCH=slc6_amd64_gcc481
202 #cd $CMSSW_BASE/src
203 cd {cmssw}/src
204 shopt -s expand_aliases
205 cmsenv
206 cd $WORKDIR
207 cp -rf $SUBMISIONDIR .
208 ls
209 cd `find . -type d | grep /`
210 echo 'running'
211 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
212 python {cmssw}/src/CMGTools/RootTools/python/fwlite/looper.py pycfg.py config.pck
213 echo
214 {copy}
215 ###########################################################################
216 DATE_END=`date +%s`
217 RUNTIME=$((DATE_END-DATE_START))
218 echo "################################################################"
219 echo "Job finished at " `date`
220 echo "Wallclock running time: $RUNTIME s"
221 exit 0
222 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
223 
224  return script
225 
226 def batchScriptIC(jobDir):
227  '''prepare a IC version of the batch script'''
228 
229 
230  cmssw_release = os.environ['CMSSW_BASE']
231  script = """#!/bin/bash
232 export X509_USER_PROXY=/home/hep/$USER/myproxy
233 source /vols/cms/grid/setup.sh
234 cd {jobdir}
235 cd {cmssw}/src
236 eval `scramv1 ru -sh`
237 cd -
238 echo 'running'
239 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck
240 echo
241 echo 'sending the job directory back'
242 mv Loop/* ./ && rm -r Loop
243 """.format(jobdir = jobDir,cmssw = cmssw_release)
244  return script
245 
246 def batchScriptLocal( remoteDir, index ):
247  '''prepare a local version of the batch script, to run using nohup'''
248 
249  script = """#!/bin/bash
250 echo 'running'
251 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck echo
252 echo 'sending the job directory back'
253 mv Loop/* ./
254 """
255  return script
256 
257 
258 class MyBatchManager( BatchManager ):
259  '''Batch manager specific to cmsRun processes.'''
260 
261  def PrepareJobUser(self, jobDir, value ):
262  '''Prepare one job. This function is called by the base class.'''
263  print value
264  print components[value]
265 
266  #prepare the batch script
267  scriptFileName = jobDir+'/batchScript.sh'
268  scriptFile = open(scriptFileName,'w')
269  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
270  mode = self.RunningMode(options.batch)
271  if mode == 'LXPLUS':
272  scriptFile.write( batchScriptCERN( jobDir, storeDir) )
273  elif mode == 'PSI':
274  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
275  elif mode == 'LOCAL':
276  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
277  elif mode == 'PISA' :
278  scriptFile.write( batchScriptPISA( storeDir, value) )
279  elif mode == 'PADOVA' :
280  scriptFile.write( batchScriptPADOVA( value, jobDir) )
281  elif mode == 'IC':
282  scriptFile.write( batchScriptIC(jobDir) )
283  scriptFile.close()
284  os.system('chmod +x %s' % scriptFileName)
285 
286  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
287 # jobConfig = copy.deepcopy(config)
288 # jobConfig.components = [ components[value] ]
289  cfgFile = open(jobDir+'/config.pck','w')
290  pickle.dump( components[value] , cfgFile )
291  # pickle.dump( cfo, cfgFile )
292  cfgFile.close()
293 
294 
295 if __name__ == '__main__':
296  batchManager = MyBatchManager()
297  batchManager.parser_.usage="""
298  %prog [options] <cfgFile>
299 
300  Run Colin's python analysis system on the batch.
301  Job splitting is determined by your configuration file.
302  """
303 
304  options, args = batchManager.ParseOptions()
305 
306  cfgFileName = args[0]
307 
308  handle = open(cfgFileName, 'r')
309  cfo = imp.load_source("pycfg", cfgFileName, handle)
310  config = cfo.config
311  handle.close()
312 
313  components = split( [comp for comp in config.components if len(comp.files)>0] )
314  listOfValues = range(0, len(components))
315  listOfNames = [comp.name for comp in components]
316 
317  batchManager.PrepareJobs( listOfValues, listOfNames )
318  waitingTime = 0.1
319  batchManager.SubmitJobs( waitingTime )
320 
def batchScriptCERN
Definition: heppy_batch.py:75
def batchScriptPISA
Definition: heppy_batch.py:43
def batchScriptLocal
Definition: heppy_batch.py:246
def batchScriptPADOVA
Definition: heppy_batch.py:14
def batchScriptIC
Definition: heppy_batch.py:226
def batchScriptPSI
Definition: heppy_batch.py:135
double split
Definition: MVATrainer.cc:139