CMS 3D CMS Logo

heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 from __future__ import print_function
4 import sys
5 import imp
6 import copy
7 import os
8 import shutil
9 import pickle
10 import json
11 import math
12 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
13 
14 from PhysicsTools.HeppyCore.framework.heppy_loop import split
15 
16 def batchScriptPADOVA( index, jobDir='./'):
17  '''prepare the LSF version of the batch script, to run on LSF'''
18  script = """#!/bin/bash
19 #BSUB -q local
20 #BSUB -J test
21 #BSUB -o test.log
22 cd {jdir}
23 echo 'PWD:'
24 pwd
25 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
26 source $VO_CMS_SW_DIR/cmsset_default.sh
27 echo 'environment:'
28 echo
29 env > local.env
30 env
31 # ulimit -v 3000000 # NO
32 echo 'copying job dir to worker'
33 eval `scram runtime -sh`
34 ls
35 echo 'running'
36 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
37 exit $?
38 #echo
39 #echo 'sending the job directory back'
40 #echo cp -r Loop/* $LS_SUBCWD
41 """.format(jdir=jobDir)
42 
43  return script
44 
45 def batchScriptPISA( index, remoteDir=''):
46  '''prepare the LSF version of the batch script, to run on LSF'''
47  script = """#!/bin/bash
48 #BSUB -q cms
49 echo 'PWD:'
50 pwd
51 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
52 source $VO_CMS_SW_DIR/cmsset_default.sh
53 echo 'environment:'
54 echo
55 env > local.env
56 env
57 # ulimit -v 3000000 # NO
58 echo 'copying job dir to worker'
59 ###cd $CMSSW_BASE/src
60 eval `scramv1 runtime -sh`
61 #eval `scramv1 ru -sh`
62 # cd $LS_SUBCWD
63 # eval `scramv1 ru -sh`
64 ##cd -
65 ##cp -rf $LS_SUBCWD .
66 ls
67 echo `find . -type d | grep /`
68 echo 'running'
69 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
70 exit $?
71 #echo
72 #echo 'sending the job directory back'
73 #echo cp -r Loop/* $LS_SUBCWD
74 """
75  return script
76 
77 def batchScriptCERN( jobDir, remoteDir=''):
78  '''prepare the LSF version of the batch script, to run on LSF'''
79 
80  dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
81 rm Loop/cmsswPreProcessing.root
82 cp -r Loop/* $LS_SUBCWD
83 if [ $? -ne 0 ]; then
84  echo 'ERROR: problem copying job directory back'
85 else
86  echo 'job directory copy succeeded'
87 fi"""
88 
89  if remoteDir=='':
90  cpCmd=dirCopy
91  elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
92  cpCmd="""echo 'sending root files to remote dir'
93 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
94 for f in Loop/*/tree*.root
95 do
96  rm Loop/cmsswPreProcessing.root
97  ff=`echo $f | cut -d/ -f2`
98  ff="${{ff}}_`basename $f | cut -d . -f 1`"
99  echo $f
100  echo $ff
101  export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
102  source $VO_CMS_SW_DIR/cmsset_default.sh
103  for try in `seq 1 3`; do
104  echo "Stageout try $try"
105  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
106  /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
107  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
108  /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
109  if [ $? -ne 0 ]; then
110  echo "ERROR: remote copy failed for file $ff"
111  continue
112  fi
113  echo "remote copy succeeded"
114  remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
115  locsize=$(cat `pwd`/$f | wc -c)
116  ok=$(($remsize==$locsize))
117  if [ $ok -ne 1 ]; then
118  echo "Problem with copy (file sizes don't match), will retry in 30s"
119  sleep 30
120  continue
121  fi
122  echo "everything ok"
123  rm $f
124  echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
125  break
126  done
127 done
128 cp -r Loop/* $LS_SUBCWD
129 if [ $? -ne 0 ]; then
130  echo 'ERROR: problem copying job directory back'
131 else
132  echo 'job directory copy succeeded'
133 fi
134 """.format(
135  idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
136  srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
137  )
138  else:
139  print("chosen location not supported yet: ", remoteDir)
140  print('path must start with /store/')
141  sys.exit(1)
142 
143  script = """#!/bin/bash
144 #BSUB -q 8nm
145 echo 'environment:'
146 echo
147 env | sort
148 # ulimit -v 3000000 # NO
149 echo 'copying job dir to worker'
150 cd $CMSSW_BASE/src
151 eval `scramv1 ru -sh`
152 # cd $LS_SUBCWD
153 # eval `scramv1 ru -sh`
154 cd -
155 cp -rf $LS_SUBCWD .
156 ls
157 cd `find . -type d | grep /`
158 echo 'running'
159 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
160 echo
161 {copy}
162 """.format(copy=cpCmd)
163 
164  return script
165 
166 
167 def batchScriptPSI( index, jobDir, remoteDir=''):
168  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
169 
170  cmssw_release = os.environ['CMSSW_BASE']
171  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
172 
173  if remoteDir=='':
174  cpCmd="""echo 'sending the job directory back'
175 rm Loop/cmsswPreProcessing.root
176 cp -r Loop/* $SUBMISIONDIR"""
177  elif remoteDir.startswith("/pnfs/psi.ch"):
178  cpCmd="""echo 'sending root files to remote dir'
179 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
180 for f in Loop/mt2*.root
181 do
182  ff=`basename $f | cut -d . -f 1`
183  #d=`echo $f | cut -d / -f 2`
184  gfal-mkdir {srm}
185  echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
186  gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
187  if [ $? -ne 0 ]; then
188  echo "ERROR: remote copy failed for file $ff"
189  else
190  echo "remote copy succeeded"
191  rm Loop/$ff.root
192  fi
193 done
194 rm Loop/cmsswPreProcessing.root
195 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
196  else:
197  print("remote directory not supported yet: ", remoteDir)
198  print('path must start with "/pnfs/psi.ch"')
199  sys.exit(1)
200 
201 
202  script = """#!/bin/bash
203 shopt expand_aliases
204 ##### MONITORING/DEBUG INFORMATION ###############################
205 DATE_START=`date +%s`
206 echo "Job started at " `date`
207 cat <<EOF
208 ################################################################
209 ## QUEUEING SYSTEM SETTINGS:
210 HOME=$HOME
211 USER=$USER
212 JOB_ID=$JOB_ID
213 JOB_NAME=$JOB_NAME
214 HOSTNAME=$HOSTNAME
215 TASK_ID=$TASK_ID
216 QUEUE=$QUEUE
217 
218 EOF
219 echo "######## Environment Variables ##########"
220 env
221 echo "################################################################"
222 TOPWORKDIR=/scratch/`whoami`
223 JOBDIR=sgejob-$JOB_ID
224 WORKDIR=$TOPWORKDIR/$JOBDIR
225 SUBMISIONDIR={jdir}
226 if test -e "$WORKDIR"; then
227  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
228  exit 1
229 fi
230 mkdir -p $WORKDIR
231 if test ! -d "$WORKDIR"; then
232  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
233  exit 1
234 fi
235 
236 #source $VO_CMS_SW_DIR/cmsset_default.sh
237 source {vo}/cmsset_default.sh
238 export SCRAM_ARCH=slc6_amd64_gcc481
239 #cd $CMSSW_BASE/src
240 cd {cmssw}/src
241 shopt -s expand_aliases
242 cmsenv
243 cd $WORKDIR
244 cp -rf $SUBMISIONDIR .
245 ls
246 cd `find . -type d | grep /`
247 echo 'running'
248 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
249 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
250 echo
251 {copy}
252 ###########################################################################
253 DATE_END=`date +%s`
254 RUNTIME=$((DATE_END-DATE_START))
255 echo "################################################################"
256 echo "Job finished at " `date`
257 echo "Wallclock running time: $RUNTIME s"
258 exit 0
259 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
260 
261  return script
262 
263 def batchScriptIC(jobDir):
264  '''prepare a IC version of the batch script'''
265 
266 
267  cmssw_release = os.environ['CMSSW_BASE']
268  script = """#!/bin/bash
269 export X509_USER_PROXY=/home/hep/$USER/myproxy
270 source /vols/cms/grid/setup.sh
271 cd {jobdir}
272 cd {cmssw}/src
273 eval `scramv1 ru -sh`
274 cd -
275 echo 'running'
276 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
277 echo
278 echo 'sending the job directory back'
279 mv Loop/* ./ && rm -r Loop
280 """.format(jobdir = jobDir,cmssw = cmssw_release)
281  return script
282 
283 def batchScriptLocal( remoteDir, index ):
284  '''prepare a local version of the batch script, to run using nohup'''
285 
286  script = """#!/bin/bash
287 echo 'running'
288 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
289 echo
290 echo 'sending the job directory back'
291 mv Loop/* ./
292 """
293  return script
294 
295 
296 class MyBatchManager( BatchManager ):
297  '''Batch manager specific to cmsRun processes.'''
298 
299  def PrepareJobUser(self, jobDir, value ):
300  '''Prepare one job. This function is called by the base class.'''
301  print(value)
302  print(components[value])
303 
304  #prepare the batch script
305  scriptFileName = jobDir+'/batchScript.sh'
306  scriptFile = open(scriptFileName,'w')
307  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
308  mode = self.RunningMode(options.batch)
309  if mode == 'LXPLUS':
310  scriptFile.write( batchScriptCERN( jobDir, storeDir ) )
311  elif mode == 'PSI':
312  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
313  elif mode == 'LOCAL':
314  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
315  elif mode == 'PISA' :
316  scriptFile.write( batchScriptPISA( storeDir, value) )
317  elif mode == 'PADOVA' :
318  scriptFile.write( batchScriptPADOVA( value, jobDir) )
319  elif mode == 'IC':
320  scriptFile.write( batchScriptIC(jobDir) )
321  scriptFile.close()
322  os.system('chmod +x %s' % scriptFileName)
323 
324  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
325 # jobConfig = copy.deepcopy(config)
326 # jobConfig.components = [ components[value] ]
327  cfgFile = open(jobDir+'/config.pck','w')
328  pickle.dump( components[value] , cfgFile )
329  # pickle.dump( cfo, cfgFile )
330  cfgFile.close()
331  if hasattr(self,"heppyOptions_"):
332  optjsonfile = open(jobDir+'/options.json','w')
333  optjsonfile.write(json.dumps(self.heppyOptions_))
334  optjsonfile.close()
335 
336 if __name__ == '__main__':
337  batchManager = MyBatchManager()
338  batchManager.parser_.usage="""
339  %prog [options] <cfgFile>
340 
341  Run Colin's python analysis system on the batch.
342  Job splitting is determined by your configuration file.
343  """
344 
345  options, args = batchManager.ParseOptions()
346 
347  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
348  for opt in options.extraOptions:
349  if "=" in opt:
350  (key,val) = opt.split("=",1)
351  _heppyGlobalOptions[key] = val
352  else:
353  _heppyGlobalOptions[opt] = True
354  batchManager.heppyOptions_=_heppyGlobalOptions
355 
356  cfgFileName = args[0]
357 
358  handle = open(cfgFileName, 'r')
359  # import pdb; pdb.set_trace()
360  cfo = imp.load_source("pycfg", cfgFileName, handle)
361  config = cfo.config
362  handle.close()
363 
364  components = split( [comp for comp in config.components if len(comp.files)>0] )
365  listOfValues = range(0, len(components))
366  listOfNames = [comp.name for comp in components]
367 
368  batchManager.PrepareJobs( listOfValues, listOfNames )
369  waitingTime = 0.1
370  batchManager.SubmitJobs( waitingTime )
371 
def batchScriptPADOVA(index, jobDir='./')
Definition: heppy_batch.py:16
def batchScriptIC(jobDir)
Definition: heppy_batch.py:263
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
Definition: heppy_batch.py:45
def batchScriptPSI(index, jobDir, remoteDir='')
Definition: heppy_batch.py:167
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def PrepareJobUser(self, jobDir, value)
Definition: heppy_batch.py:299
def batchScriptLocal(remoteDir, index)
Definition: heppy_batch.py:283
double split
Definition: MVATrainer.cc:139
def batchScriptCERN(jobDir, remoteDir='')
Definition: heppy_batch.py:77