CMS 3D CMS Logo

heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 import sys
4 import imp
5 import copy
6 import os
7 import shutil
8 import pickle
9 import json
10 import math
11 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
12 
13 from PhysicsTools.HeppyCore.framework.heppy_loop import split
14 
15 def batchScriptPADOVA( index, jobDir='./'):
16  '''prepare the LSF version of the batch script, to run on LSF'''
17  script = """#!/bin/bash
18 #BSUB -q local
19 #BSUB -J test
20 #BSUB -o test.log
21 cd {jdir}
22 echo 'PWD:'
23 pwd
24 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
25 source $VO_CMS_SW_DIR/cmsset_default.sh
26 echo 'environment:'
27 echo
28 env > local.env
29 env
30 # ulimit -v 3000000 # NO
31 echo 'copying job dir to worker'
32 eval `scram runtime -sh`
33 ls
34 echo 'running'
35 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
36 exit $?
37 #echo
38 #echo 'sending the job directory back'
39 #echo cp -r Loop/* $LS_SUBCWD
40 """.format(jdir=jobDir)
41 
42  return script
43 
44 def batchScriptPISA( index, remoteDir=''):
45  '''prepare the LSF version of the batch script, to run on LSF'''
46  script = """#!/bin/bash
47 #BSUB -q cms
48 echo 'PWD:'
49 pwd
50 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
51 source $VO_CMS_SW_DIR/cmsset_default.sh
52 echo 'environment:'
53 echo
54 env > local.env
55 env
56 # ulimit -v 3000000 # NO
57 echo 'copying job dir to worker'
58 ###cd $CMSSW_BASE/src
59 eval `scramv1 runtime -sh`
60 #eval `scramv1 ru -sh`
61 # cd $LS_SUBCWD
62 # eval `scramv1 ru -sh`
63 ##cd -
64 ##cp -rf $LS_SUBCWD .
65 ls
66 echo `find . -type d | grep /`
67 echo 'running'
68 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
69 exit $?
70 #echo
71 #echo 'sending the job directory back'
72 #echo cp -r Loop/* $LS_SUBCWD
73 """
74  return script
75 
76 def batchScriptCERN( jobDir, remoteDir=''):
77  '''prepare the LSF version of the batch script, to run on LSF'''
78 
79  dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
80 rm Loop/cmsswPreProcessing.root
81 cp -r Loop/* $LS_SUBCWD
82 if [ $? -ne 0 ]; then
83  echo 'ERROR: problem copying job directory back'
84 else
85  echo 'job directory copy succeeded'
86 fi"""
87 
88  if remoteDir=='':
89  cpCmd=dirCopy
90  elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
91  cpCmd="""echo 'sending root files to remote dir'
92 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
93 for f in Loop/*/tree*.root
94 do
95  rm Loop/cmsswPreProcessing.root
96  ff=`echo $f | cut -d/ -f2`
97  ff="${{ff}}_`basename $f | cut -d . -f 1`"
98  echo $f
99  echo $ff
100  export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
101  source $VO_CMS_SW_DIR/cmsset_default.sh
102  for try in `seq 1 3`; do
103  echo "Stageout try $try"
104  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
105  /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
106  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
107  /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
108  if [ $? -ne 0 ]; then
109  echo "ERROR: remote copy failed for file $ff"
110  continue
111  fi
112  echo "remote copy succeeded"
113  remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
114  locsize=$(cat `pwd`/$f | wc -c)
115  ok=$(($remsize==$locsize))
116  if [ $ok -ne 1 ]; then
117  echo "Problem with copy (file sizes don't match), will retry in 30s"
118  sleep 30
119  continue
120  fi
121  echo "everything ok"
122  rm $f
123  echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
124  break
125  done
126 done
127 cp -r Loop/* $LS_SUBCWD
128 if [ $? -ne 0 ]; then
129  echo 'ERROR: problem copying job directory back'
130 else
131  echo 'job directory copy succeeded'
132 fi
133 """.format(
134  idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
135  srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
136  )
137  else:
138  print "chosen location not supported yet: ", remoteDir
139  print 'path must start with /store/'
140  sys.exit(1)
141 
142  script = """#!/bin/bash
143 #BSUB -q 8nm
144 echo 'environment:'
145 echo
146 env | sort
147 # ulimit -v 3000000 # NO
148 echo 'copying job dir to worker'
149 cd $CMSSW_BASE/src
150 eval `scramv1 ru -sh`
151 # cd $LS_SUBCWD
152 # eval `scramv1 ru -sh`
153 cd -
154 cp -rf $LS_SUBCWD .
155 ls
156 cd `find . -type d | grep /`
157 echo 'running'
158 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
159 echo
160 {copy}
161 """.format(copy=cpCmd)
162 
163  return script
164 
165 
166 def batchScriptPSI( index, jobDir, remoteDir=''):
167  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
168 
169  cmssw_release = os.environ['CMSSW_BASE']
170  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
171 
172  if remoteDir=='':
173  cpCmd="""echo 'sending the job directory back'
174 rm Loop/cmsswPreProcessing.root
175 cp -r Loop/* $SUBMISIONDIR"""
176  elif remoteDir.startswith("/pnfs/psi.ch"):
177  cpCmd="""echo 'sending root files to remote dir'
178 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
179 for f in Loop/mt2*.root
180 do
181  ff=`basename $f | cut -d . -f 1`
182  #d=`echo $f | cut -d / -f 2`
183  gfal-mkdir {srm}
184  echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
185  gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
186  if [ $? -ne 0 ]; then
187  echo "ERROR: remote copy failed for file $ff"
188  else
189  echo "remote copy succeeded"
190  rm Loop/$ff.root
191  fi
192 done
193 rm Loop/cmsswPreProcessing.root
194 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
195  else:
196  print "remote directory not supported yet: ", remoteDir
197  print 'path must start with "/pnfs/psi.ch"'
198  sys.exit(1)
199 
200 
201  script = """#!/bin/bash
202 shopt expand_aliases
203 ##### MONITORING/DEBUG INFORMATION ###############################
204 DATE_START=`date +%s`
205 echo "Job started at " `date`
206 cat <<EOF
207 ################################################################
208 ## QUEUEING SYSTEM SETTINGS:
209 HOME=$HOME
210 USER=$USER
211 JOB_ID=$JOB_ID
212 JOB_NAME=$JOB_NAME
213 HOSTNAME=$HOSTNAME
214 TASK_ID=$TASK_ID
215 QUEUE=$QUEUE
216 
217 EOF
218 echo "######## Environment Variables ##########"
219 env
220 echo "################################################################"
221 TOPWORKDIR=/scratch/`whoami`
222 JOBDIR=sgejob-$JOB_ID
223 WORKDIR=$TOPWORKDIR/$JOBDIR
224 SUBMISIONDIR={jdir}
225 if test -e "$WORKDIR"; then
226  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
227  exit 1
228 fi
229 mkdir -p $WORKDIR
230 if test ! -d "$WORKDIR"; then
231  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
232  exit 1
233 fi
234 
235 #source $VO_CMS_SW_DIR/cmsset_default.sh
236 source {vo}/cmsset_default.sh
237 export SCRAM_ARCH=slc6_amd64_gcc481
238 #cd $CMSSW_BASE/src
239 cd {cmssw}/src
240 shopt -s expand_aliases
241 cmsenv
242 cd $WORKDIR
243 cp -rf $SUBMISIONDIR .
244 ls
245 cd `find . -type d | grep /`
246 echo 'running'
247 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
248 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
249 echo
250 {copy}
251 ###########################################################################
252 DATE_END=`date +%s`
253 RUNTIME=$((DATE_END-DATE_START))
254 echo "################################################################"
255 echo "Job finished at " `date`
256 echo "Wallclock running time: $RUNTIME s"
257 exit 0
258 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
259 
260  return script
261 
262 def batchScriptIC(jobDir):
263  '''prepare a IC version of the batch script'''
264 
265 
266  cmssw_release = os.environ['CMSSW_BASE']
267  script = """#!/bin/bash
268 export X509_USER_PROXY=/home/hep/$USER/myproxy
269 source /vols/cms/grid/setup.sh
270 cd {jobdir}
271 cd {cmssw}/src
272 eval `scramv1 ru -sh`
273 cd -
274 echo 'running'
275 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
276 echo
277 echo 'sending the job directory back'
278 mv Loop/* ./ && rm -r Loop
279 """.format(jobdir = jobDir,cmssw = cmssw_release)
280  return script
281 
282 def batchScriptLocal( remoteDir, index ):
283  '''prepare a local version of the batch script, to run using nohup'''
284 
285  script = """#!/bin/bash
286 echo 'running'
287 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
288 echo
289 echo 'sending the job directory back'
290 mv Loop/* ./
291 """
292  return script
293 
294 
295 class MyBatchManager( BatchManager ):
296  '''Batch manager specific to cmsRun processes.'''
297 
298  def PrepareJobUser(self, jobDir, value ):
299  '''Prepare one job. This function is called by the base class.'''
300  print value
301  print components[value]
302 
303  #prepare the batch script
304  scriptFileName = jobDir+'/batchScript.sh'
305  scriptFile = open(scriptFileName,'w')
306  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
307  mode = self.RunningMode(options.batch)
308  if mode == 'LXPLUS':
309  scriptFile.write( batchScriptCERN( jobDir, storeDir ) )
310  elif mode == 'PSI':
311  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
312  elif mode == 'LOCAL':
313  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
314  elif mode == 'PISA' :
315  scriptFile.write( batchScriptPISA( storeDir, value) )
316  elif mode == 'PADOVA' :
317  scriptFile.write( batchScriptPADOVA( value, jobDir) )
318  elif mode == 'IC':
319  scriptFile.write( batchScriptIC(jobDir) )
320  scriptFile.close()
321  os.system('chmod +x %s' % scriptFileName)
322 
323  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
324 # jobConfig = copy.deepcopy(config)
325 # jobConfig.components = [ components[value] ]
326  cfgFile = open(jobDir+'/config.pck','w')
327  pickle.dump( components[value] , cfgFile )
328  # pickle.dump( cfo, cfgFile )
329  cfgFile.close()
330  if hasattr(self,"heppyOptions_"):
331  optjsonfile = open(jobDir+'/options.json','w')
332  optjsonfile.write(json.dumps(self.heppyOptions_))
333  optjsonfile.close()
334 
335 if __name__ == '__main__':
336  batchManager = MyBatchManager()
337  batchManager.parser_.usage="""
338  %prog [options] <cfgFile>
339 
340  Run Colin's python analysis system on the batch.
341  Job splitting is determined by your configuration file.
342  """
343 
344  options, args = batchManager.ParseOptions()
345 
346  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
347  for opt in options.extraOptions:
348  if "=" in opt:
349  (key,val) = opt.split("=",1)
350  _heppyGlobalOptions[key] = val
351  else:
352  _heppyGlobalOptions[opt] = True
353  batchManager.heppyOptions_=_heppyGlobalOptions
354 
355  cfgFileName = args[0]
356 
357  handle = open(cfgFileName, 'r')
358  # import pdb; pdb.set_trace()
359  cfo = imp.load_source("pycfg", cfgFileName, handle)
360  config = cfo.config
361  handle.close()
362 
363  components = split( [comp for comp in config.components if len(comp.files)>0] )
364  listOfValues = range(0, len(components))
365  listOfNames = [comp.name for comp in components]
366 
367  batchManager.PrepareJobs( listOfValues, listOfNames )
368  waitingTime = 0.1
369  batchManager.SubmitJobs( waitingTime )
370 
def batchScriptPADOVA(index, jobDir='./')
Definition: heppy_batch.py:15
def batchScriptIC(jobDir)
Definition: heppy_batch.py:262
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
Definition: heppy_batch.py:44
def batchScriptPSI(index, jobDir, remoteDir='')
Definition: heppy_batch.py:166
def PrepareJobUser(self, jobDir, value)
Definition: heppy_batch.py:298
def batchScriptLocal(remoteDir, index)
Definition: heppy_batch.py:282
double split
Definition: MVATrainer.cc:139
def batchScriptCERN(jobDir, remoteDir='')
Definition: heppy_batch.py:76