CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 import sys
4 import imp
5 import copy
6 import os
7 import shutil
8 import pickle
9 import math
10 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
11 
12 from PhysicsTools.HeppyCore.framework.heppy import split
13 
14 
15 def batchScriptPISA( index, remoteDir=''):
16  '''prepare the LSF version of the batch script, to run on LSF'''
17  script = """#!/bin/bash
18 #BSUB -q cms
19 echo 'PWD:'
20 pwd
21 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
22 source $VO_CMS_SW_DIR/cmsset_default.sh
23 echo 'environment:'
24 echo
25 env > local.env
26 env
27 # ulimit -v 3000000 # NO
28 echo 'copying job dir to worker'
29 ###cd $CMSSW_BASE/src
30 eval `scramv1 runtime -sh`
31 #eval `scramv1 ru -sh`
32 # cd $LS_SUBCWD
33 # eval `scramv1 ru -sh`
34 ##cd -
35 ##cp -rf $LS_SUBCWD .
36 ls
37 echo `find . -type d | grep /`
38 echo 'running'
39 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck >& local.output
40 exit $?
41 #echo
42 #echo 'sending the job directory back'
43 #echo cp -r Loop/* $LS_SUBCWD
44 """
45  return script
46 
47 
48 def batchScriptCERN( index, remoteDir=''):
49  '''prepare the LSF version of the batch script, to run on LSF'''
50  script = """#!/bin/bash
51 #BSUB -q 8nm
52 echo 'environment:'
53 echo
54 env
55 # ulimit -v 3000000 # NO
56 echo 'copying job dir to worker'
57 cd $CMSSW_BASE/src
58 eval `scramv1 ru -sh`
59 # cd $LS_SUBCWD
60 # eval `scramv1 ru -sh`
61 cd -
62 cp -rf $LS_SUBCWD .
63 ls
64 cd `find . -type d | grep /`
65 echo 'running'
66 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck
67 echo
68 echo 'sending the job directory back'
69 cp -r Loop/* $LS_SUBCWD
70 """
71  return script
72 
73 
74 def batchScriptPSI( index, jobDir, remoteDir=''):
75  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
76 
77  cmssw_release = os.environ['CMSSW_BASE']
78  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
79 
80  if remoteDir=='':
81  cpCmd="""echo 'sending the job directory back'
82 cp -r Loop/* $SUBMISIONDIR"""
83  elif remoteDir.startswith("/pnfs/psi.ch"):
84  cpCmd="""echo 'sending root files to remote dir'
85 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
86 for f in Loop/treeProducerSusyFullHad/*.root
87 do
88 echo $f
89 ff=`basename $f | cut -d . -f 1`
90 echo $ff
91 gfal-mkdir {srm}
92 echo "gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$file.root {srm}/${{ff}}_{idx}.root"
93 gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$ff.root {srm}/${{ff}}_{idx}.root
94 done
95 rm Loop/treeProducerSusyFullHad/*.root
96 echo 'sending the logs back'
97 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
98  else:
99  print "remote directory not supported yet: ", remoteDir
100  print 'path must start with "/pnfs/psi.ch"'
101  sys.exit(1)
102 
103 
104  script = """#!/bin/bash
105 shopt expand_aliases
106 ##### MONITORING/DEBUG INFORMATION ###############################
107 DATE_START=`date +%s`
108 echo "Job started at " `date`
109 cat <<EOF
110 ################################################################
111 ## QUEUEING SYSTEM SETTINGS:
112 HOME=$HOME
113 USER=$USER
114 JOB_ID=$JOB_ID
115 JOB_NAME=$JOB_NAME
116 HOSTNAME=$HOSTNAME
117 TASK_ID=$TASK_ID
118 QUEUE=$QUEUE
119 
120 EOF
121 echo "######## Environment Variables ##########"
122 env
123 echo "################################################################"
124 TOPWORKDIR=/scratch/`whoami`
125 JOBDIR=sgejob-$JOB_ID
126 WORKDIR=$TOPWORKDIR/$JOBDIR
127 SUBMISIONDIR={jdir}
128 if test -e "$WORKDIR"; then
129  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
130  exit 1
131 fi
132 mkdir -p $WORKDIR
133 if test ! -d "$WORKDIR"; then
134  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
135  exit 1
136 fi
137 
138 #source $VO_CMS_SW_DIR/cmsset_default.sh
139 source {vo}/cmsset_default.sh
140 export SCRAM_ARCH=slc6_amd64_gcc481
141 #cd $CMSSW_BASE/src
142 cd {cmssw}/src
143 shopt -s expand_aliases
144 cmsenv
145 cd $WORKDIR
146 cp -rf $SUBMISIONDIR .
147 ls
148 cd `find . -type d | grep /`
149 echo 'running'
150 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
151 python {cmssw}/src/CMGTools/RootTools/python/fwlite/looper.py pycfg.py config.pck
152 echo
153 {copy}
154 ###########################################################################
155 DATE_END=`date +%s`
156 RUNTIME=$((DATE_END-DATE_START))
157 echo "################################################################"
158 echo "Job finished at " `date`
159 echo "Wallclock running time: $RUNTIME s"
160 exit 0
161 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
162 
163  return script
164 
165 def batchScriptIC(jobDir):
166  '''prepare a IC version of the batch script'''
167 
168 
169  cmssw_release = os.environ['CMSSW_BASE']
170  script = """#!/bin/bash
171 export X509_USER_PROXY=/home/hep/$USER/myproxy
172 source /vols/cms/grid/setup.sh
173 cd {jobdir}
174 cd {cmssw}/src
175 eval `scramv1 ru -sh`
176 cd -
177 echo 'running'
178 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck
179 echo
180 echo 'sending the job directory back'
181 mv Loop/* ./
182 """.format(jobdir = jobDir,cmssw = cmssw_release)
183  return script
184 
185 def batchScriptLocal( remoteDir, index ):
186  '''prepare a local version of the batch script, to run using nohup'''
187 
188  script = """#!/bin/bash
189 echo 'running'
190 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck echo
191 echo 'sending the job directory back'
192 mv Loop/* ./
193 """
194  return script
195 
196 
197 class MyBatchManager( BatchManager ):
198  '''Batch manager specific to cmsRun processes.'''
199 
200  def PrepareJobUser(self, jobDir, value ):
201  '''Prepare one job. This function is called by the base class.'''
202  print value
203  print components[value]
204 
205  #prepare the batch script
206  scriptFileName = jobDir+'/batchScript.sh'
207  scriptFile = open(scriptFileName,'w')
208  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
209  mode = self.RunningMode(options.batch)
210  if mode == 'LXPLUS':
211  scriptFile.write( batchScriptCERN( storeDir, value) ) # watch out arguments are swapped (although not used)
212  elif mode == 'PSI':
213  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
214  elif mode == 'LOCAL':
215  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
216  elif mode == 'PISA' :
217  scriptFile.write( batchScriptPISA( storeDir, value) )
218  elif mode == 'IC':
219  scriptFile.write( batchScriptIC(jobDir) )
220  scriptFile.close()
221  os.system('chmod +x %s' % scriptFileName)
222 
223  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
224 # jobConfig = copy.deepcopy(config)
225 # jobConfig.components = [ components[value] ]
226  cfgFile = open(jobDir+'/config.pck','w')
227  pickle.dump( components[value] , cfgFile )
228  # pickle.dump( cfo, cfgFile )
229  cfgFile.close()
230 
231 
232 if __name__ == '__main__':
233  batchManager = MyBatchManager()
234  batchManager.parser_.usage="""
235  %prog [options] <cfgFile>
236 
237  Run Colin's python analysis system on the batch.
238  Job splitting is determined by your configuration file.
239  """
240 
241  options, args = batchManager.ParseOptions()
242 
243  cfgFileName = args[0]
244 
245  handle = open(cfgFileName, 'r')
246  cfo = imp.load_source("pycfg", cfgFileName, handle)
247  config = cfo.config
248  handle.close()
249 
250  components = split( [comp for comp in config.components if len(comp.files)>0] )
251  listOfValues = range(0, len(components))
252  listOfNames = [comp.name for comp in components]
253 
254  batchManager.PrepareJobs( listOfValues, listOfNames )
255  waitingTime = 0.1
256  batchManager.SubmitJobs( waitingTime )
257 
def batchScriptCERN
Definition: heppy_batch.py:48
def batchScriptPISA
Definition: heppy_batch.py:15
def batchScriptLocal
Definition: heppy_batch.py:185
def batchScriptIC
Definition: heppy_batch.py:165
def batchScriptPSI
Definition: heppy_batch.py:74
double split
Definition: MVATrainer.cc:139