CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 import sys
4 import imp
5 import copy
6 import os
7 import shutil
8 import pickle
9 import json
10 import math
11 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
12 
13 from PhysicsTools.HeppyCore.framework.heppy_loop import split
14 
15 def batchScriptPADOVA( index, jobDir='./'):
16  '''prepare the LSF version of the batch script, to run on LSF'''
17  script = """#!/bin/bash
18 #BSUB -q local
19 #BSUB -J test
20 #BSUB -o test.log
21 cd {jdir}
22 echo 'PWD:'
23 pwd
24 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
25 source $VO_CMS_SW_DIR/cmsset_default.sh
26 echo 'environment:'
27 echo
28 env > local.env
29 env
30 # ulimit -v 3000000 # NO
31 echo 'copying job dir to worker'
32 eval `scram runtime -sh`
33 ls
34 echo 'running'
35 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
36 exit $?
37 #echo
38 #echo 'sending the job directory back'
39 #echo cp -r Loop/* $LS_SUBCWD
40 """.format(jdir=jobDir)
41 
42  return script
43 
44 def batchScriptPISA( index, remoteDir=''):
45  '''prepare the LSF version of the batch script, to run on LSF'''
46  script = """#!/bin/bash
47 #BSUB -q cms
48 echo 'PWD:'
49 pwd
50 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
51 source $VO_CMS_SW_DIR/cmsset_default.sh
52 echo 'environment:'
53 echo
54 env > local.env
55 env
56 # ulimit -v 3000000 # NO
57 echo 'copying job dir to worker'
58 ###cd $CMSSW_BASE/src
59 eval `scramv1 runtime -sh`
60 #eval `scramv1 ru -sh`
61 # cd $LS_SUBCWD
62 # eval `scramv1 ru -sh`
63 ##cd -
64 ##cp -rf $LS_SUBCWD .
65 ls
66 echo `find . -type d | grep /`
67 echo 'running'
68 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
69 exit $?
70 #echo
71 #echo 'sending the job directory back'
72 #echo cp -r Loop/* $LS_SUBCWD
73 """
74  return script
75 
76 def batchScriptCERN( jobDir, remoteDir=''):
77  '''prepare the LSF version of the batch script, to run on LSF'''
78 
79  dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
80 rm Loop/cmsswPreProcessing.root
81 cp -r Loop/* $LS_SUBCWD
82 if [ $? -ne 0 ]; then
83  echo 'ERROR: problem copying job directory back'
84 else
85  echo 'job directory copy succeeded'
86 fi"""
87 
88  if remoteDir=='':
89  cpCmd=dirCopy
90  elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
91  cpCmd="""echo 'sending root files to remote dir'
92 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
93 for f in Loop/tree*.root
94 do
95  ff=`basename $f | cut -d . -f 1`
96  echo $f
97  echo $ff
98  export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
99  source $VO_CMS_SW_DIR/cmsset_default.sh
100  echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
101  gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
102  echo $idx
103  if [ $? -ne 0 ]; then
104  echo "ERROR: remote copy failed for file $ff"
105  else
106  echo "remote copy succeeded"
107  rm Loop/$ff.root
108  fi
109 done
110 #fi
111 """.format(idx=jobDir[jobDir.find("_Chunk")+6:].strip("/"), srm=""+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
112  else:
113  print "chosen location not supported yet: ", remoteDir
114  print 'path must start with /store/'
115  sys.exit(1)
116 
117  script = """#!/bin/bash
118 #BSUB -q 8nm
119 echo 'environment:'
120 echo
121 env | sort
122 # ulimit -v 3000000 # NO
123 echo 'copying job dir to worker'
124 cd $CMSSW_BASE/src
125 eval `scramv1 ru -sh`
126 # cd $LS_SUBCWD
127 # eval `scramv1 ru -sh`
128 cd -
129 cp -rf $LS_SUBCWD .
130 ls
131 cd `find . -type d | grep /`
132 echo 'running'
133 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
134 echo
135 {copy}
136 """.format(copy=cpCmd)
137 
138  return script
139 
140 
141 def batchScriptPSI( index, jobDir, remoteDir=''):
142  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
143 
144  cmssw_release = os.environ['CMSSW_BASE']
145  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
146 
147  if remoteDir=='':
148  cpCmd="""echo 'sending the job directory back'
149 cp -r Loop/* $SUBMISIONDIR"""
150  elif remoteDir.startswith("/pnfs/psi.ch"):
151  cpCmd="""echo 'sending root files to remote dir'
152 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
153 for f in Loop/treeProducerSusyFullHad/*.root
154 do
155 echo $f
156 ff=`basename $f | cut -d . -f 1`
157 echo $ff
158 gfal-mkdir {srm}
159 echo "gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$file.root {srm}/${{ff}}_{idx}.root"
160 gfal-copy file:///`pwd`/Loop/treeProducerSusyFullHad/$ff.root {srm}/${{ff}}_{idx}.root
161 done
162 rm Loop/treeProducerSusyFullHad/*.root
163 echo 'sending the logs back'
164 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
165  else:
166  print "remote directory not supported yet: ", remoteDir
167  print 'path must start with "/pnfs/psi.ch"'
168  sys.exit(1)
169 
170 
171  script = """#!/bin/bash
172 shopt expand_aliases
173 ##### MONITORING/DEBUG INFORMATION ###############################
174 DATE_START=`date +%s`
175 echo "Job started at " `date`
176 cat <<EOF
177 ################################################################
178 ## QUEUEING SYSTEM SETTINGS:
179 HOME=$HOME
180 USER=$USER
181 JOB_ID=$JOB_ID
182 JOB_NAME=$JOB_NAME
183 HOSTNAME=$HOSTNAME
184 TASK_ID=$TASK_ID
185 QUEUE=$QUEUE
186 
187 EOF
188 echo "######## Environment Variables ##########"
189 env
190 echo "################################################################"
191 TOPWORKDIR=/scratch/`whoami`
192 JOBDIR=sgejob-$JOB_ID
193 WORKDIR=$TOPWORKDIR/$JOBDIR
194 SUBMISIONDIR={jdir}
195 if test -e "$WORKDIR"; then
196  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
197  exit 1
198 fi
199 mkdir -p $WORKDIR
200 if test ! -d "$WORKDIR"; then
201  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
202  exit 1
203 fi
204 
205 #source $VO_CMS_SW_DIR/cmsset_default.sh
206 source {vo}/cmsset_default.sh
207 export SCRAM_ARCH=slc6_amd64_gcc481
208 #cd $CMSSW_BASE/src
209 cd {cmssw}/src
210 shopt -s expand_aliases
211 cmsenv
212 cd $WORKDIR
213 cp -rf $SUBMISIONDIR .
214 ls
215 cd `find . -type d | grep /`
216 echo 'running'
217 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
218 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
219 echo
220 {copy}
221 ###########################################################################
222 DATE_END=`date +%s`
223 RUNTIME=$((DATE_END-DATE_START))
224 echo "################################################################"
225 echo "Job finished at " `date`
226 echo "Wallclock running time: $RUNTIME s"
227 exit 0
228 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
229 
230  return script
231 
232 def batchScriptIC(jobDir):
233  '''prepare a IC version of the batch script'''
234 
235 
236  cmssw_release = os.environ['CMSSW_BASE']
237  script = """#!/bin/bash
238 export X509_USER_PROXY=/home/hep/$USER/myproxy
239 source /vols/cms/grid/setup.sh
240 cd {jobdir}
241 cd {cmssw}/src
242 eval `scramv1 ru -sh`
243 cd -
244 echo 'running'
245 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
246 echo
247 echo 'sending the job directory back'
248 mv Loop/* ./ && rm -r Loop
249 """.format(jobdir = jobDir,cmssw = cmssw_release)
250  return script
251 
252 def batchScriptLocal( remoteDir, index ):
253  '''prepare a local version of the batch script, to run using nohup'''
254 
255  script = """#!/bin/bash
256 echo 'running'
257 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
258 echo
259 echo 'sending the job directory back'
260 mv Loop/* ./
261 """
262  return script
263 
264 
265 class MyBatchManager( BatchManager ):
266  '''Batch manager specific to cmsRun processes.'''
267 
268  def PrepareJobUser(self, jobDir, value ):
269  '''Prepare one job. This function is called by the base class.'''
270  print value
271  print components[value]
272 
273  #prepare the batch script
274  scriptFileName = jobDir+'/batchScript.sh'
275  scriptFile = open(scriptFileName,'w')
276  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
277  mode = self.RunningMode(options.batch)
278  if mode == 'LXPLUS':
279  scriptFile.write( batchScriptCERN( jobDir, storeDir) )
280  elif mode == 'PSI':
281  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
282  elif mode == 'LOCAL':
283  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
284  elif mode == 'PISA' :
285  scriptFile.write( batchScriptPISA( storeDir, value) )
286  elif mode == 'PADOVA' :
287  scriptFile.write( batchScriptPADOVA( value, jobDir) )
288  elif mode == 'IC':
289  scriptFile.write( batchScriptIC(jobDir) )
290  scriptFile.close()
291  os.system('chmod +x %s' % scriptFileName)
292 
293  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
294 # jobConfig = copy.deepcopy(config)
295 # jobConfig.components = [ components[value] ]
296  cfgFile = open(jobDir+'/config.pck','w')
297  pickle.dump( components[value] , cfgFile )
298  # pickle.dump( cfo, cfgFile )
299  cfgFile.close()
300  if hasattr(self,"heppyOptions_"):
301  optjsonfile = open(jobDir+'/options.json','w')
302  optjsonfile.write(json.dumps(self.heppyOptions_))
303  optjsonfile.close()
304 
305 if __name__ == '__main__':
306  batchManager = MyBatchManager()
307  batchManager.parser_.usage="""
308  %prog [options] <cfgFile>
309 
310  Run Colin's python analysis system on the batch.
311  Job splitting is determined by your configuration file.
312  """
313 
314  options, args = batchManager.ParseOptions()
315 
316  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
317  for opt in options.extraOptions:
318  if "=" in opt:
319  (key,val) = opt.split("=",1)
320  _heppyGlobalOptions[key] = val
321  else:
322  _heppyGlobalOptions[opt] = True
323  batchManager.heppyOptions_=_heppyGlobalOptions
324 
325  cfgFileName = args[0]
326 
327  handle = open(cfgFileName, 'r')
328  # import pdb; pdb.set_trace()
329  cfo = imp.load_source("pycfg", cfgFileName, handle)
330  config = cfo.config
331  handle.close()
332 
333  components = split( [comp for comp in config.components if len(comp.files)>0] )
334  listOfValues = range(0, len(components))
335  listOfNames = [comp.name for comp in components]
336 
337  batchManager.PrepareJobs( listOfValues, listOfNames )
338  waitingTime = 0.1
339  batchManager.SubmitJobs( waitingTime )
340 
def batchScriptCERN
Definition: heppy_batch.py:76
def batchScriptPISA
Definition: heppy_batch.py:44
def batchScriptLocal
Definition: heppy_batch.py:252
def batchScriptPADOVA
Definition: heppy_batch.py:15
def batchScriptIC
Definition: heppy_batch.py:232
def batchScriptPSI
Definition: heppy_batch.py:141
double split
Definition: MVATrainer.cc:139