CMS 3D CMS Logo

heppy_batch.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 from __future__ import print_function
4 from builtins import range
5 import sys
6 import imp
7 import copy
8 import os
9 import shutil
10 import pickle
11 import json
12 import math
13 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
14 
15 from PhysicsTools.HeppyCore.framework.heppy_loop import split
16 
17 def batchScriptPADOVA( index, jobDir='./'):
18  '''prepare the LSF version of the batch script, to run on LSF'''
19  script = """#!/bin/bash
20 #BSUB -q local
21 #BSUB -J test
22 #BSUB -o test.log
23 cd {jdir}
24 echo 'PWD:'
25 pwd
26 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
27 source $VO_CMS_SW_DIR/cmsset_default.sh
28 echo 'environment:'
29 echo
30 env > local.env
31 env
32 # ulimit -v 3000000 # NO
33 echo 'copying job dir to worker'
34 eval `scram runtime -sh`
35 ls
36 echo 'running'
37 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
38 exit $?
39 #echo
40 #echo 'sending the job directory back'
41 #echo cp -r Loop/* $LS_SUBCWD
42 """.format(jdir=jobDir)
43 
44  return script
45 
46 def batchScriptPISA( index, remoteDir=''):
47  '''prepare the LSF version of the batch script, to run on LSF'''
48  script = """#!/bin/bash
49 #BSUB -q cms
50 echo 'PWD:'
51 pwd
52 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
53 source $VO_CMS_SW_DIR/cmsset_default.sh
54 echo 'environment:'
55 echo
56 env > local.env
57 env
58 # ulimit -v 3000000 # NO
59 echo 'copying job dir to worker'
60 ###cd $CMSSW_BASE/src
61 eval `scramv1 runtime -sh`
62 #eval `scramv1 ru -sh`
63 # cd $LS_SUBCWD
64 # eval `scramv1 ru -sh`
65 ##cd -
66 ##cp -rf $LS_SUBCWD .
67 ls
68 echo `find . -type d | grep /`
69 echo 'running'
70 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
71 exit $?
72 #echo
73 #echo 'sending the job directory back'
74 #echo cp -r Loop/* $LS_SUBCWD
75 """
76  return script
77 
78 def batchScriptCERN( jobDir, remoteDir=''):
79  '''prepare the LSF version of the batch script, to run on LSF'''
80 
81  dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
82 rm Loop/cmsswPreProcessing.root
83 cp -r Loop/* $LS_SUBCWD
84 if [ $? -ne 0 ]; then
85  echo 'ERROR: problem copying job directory back'
86 else
87  echo 'job directory copy succeeded'
88 fi"""
89 
90  if remoteDir=='':
91  cpCmd=dirCopy
92  elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
93  cpCmd="""echo 'sending root files to remote dir'
94 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
95 for f in Loop/*/tree*.root
96 do
97  rm Loop/cmsswPreProcessing.root
98  ff=`echo $f | cut -d/ -f2`
99  ff="${{ff}}_`basename $f | cut -d . -f 1`"
100  echo $f
101  echo $ff
102  export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
103  source $VO_CMS_SW_DIR/cmsset_default.sh
104  for try in `seq 1 3`; do
105  echo "Stageout try $try"
106  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
107  /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
108  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
109  /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
110  if [ $? -ne 0 ]; then
111  echo "ERROR: remote copy failed for file $ff"
112  continue
113  fi
114  echo "remote copy succeeded"
115  remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
116  locsize=$(cat `pwd`/$f | wc -c)
117  ok=$(($remsize==$locsize))
118  if [ $ok -ne 1 ]; then
119  echo "Problem with copy (file sizes don't match), will retry in 30s"
120  sleep 30
121  continue
122  fi
123  echo "everything ok"
124  rm $f
125  echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
126  break
127  done
128 done
129 cp -r Loop/* $LS_SUBCWD
130 if [ $? -ne 0 ]; then
131  echo 'ERROR: problem copying job directory back'
132 else
133  echo 'job directory copy succeeded'
134 fi
135 """.format(
136  idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
137  srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
138  )
139  else:
140  print("chosen location not supported yet: ", remoteDir)
141  print('path must start with /store/')
142  sys.exit(1)
143 
144  script = """#!/bin/bash
145 #BSUB -q 8nm
146 echo 'environment:'
147 echo
148 env | sort
149 # ulimit -v 3000000 # NO
150 echo 'copying job dir to worker'
151 cd $CMSSW_BASE/src
152 eval `scramv1 ru -sh`
153 # cd $LS_SUBCWD
154 # eval `scramv1 ru -sh`
155 cd -
156 cp -rf $LS_SUBCWD .
157 ls
158 cd `find . -type d | grep /`
159 echo 'running'
160 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
161 echo
162 {copy}
163 """.format(copy=cpCmd)
164 
165  return script
166 
167 
168 def batchScriptPSI( index, jobDir, remoteDir=''):
169  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
170 
171  cmssw_release = os.environ['CMSSW_BASE']
172  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
173 
174  if remoteDir=='':
175  cpCmd="""echo 'sending the job directory back'
176 rm Loop/cmsswPreProcessing.root
177 cp -r Loop/* $SUBMISIONDIR"""
178  elif remoteDir.startswith("/pnfs/psi.ch"):
179  cpCmd="""echo 'sending root files to remote dir'
180 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
181 for f in Loop/mt2*.root
182 do
183  ff=`basename $f | cut -d . -f 1`
184  #d=`echo $f | cut -d / -f 2`
185  gfal-mkdir {srm}
186  echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
187  gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
188  if [ $? -ne 0 ]; then
189  echo "ERROR: remote copy failed for file $ff"
190  else
191  echo "remote copy succeeded"
192  rm Loop/$ff.root
193  fi
194 done
195 rm Loop/cmsswPreProcessing.root
196 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
197  else:
198  print("remote directory not supported yet: ", remoteDir)
199  print('path must start with "/pnfs/psi.ch"')
200  sys.exit(1)
201 
202 
203  script = """#!/bin/bash
204 shopt expand_aliases
205 ##### MONITORING/DEBUG INFORMATION ###############################
206 DATE_START=`date +%s`
207 echo "Job started at " `date`
208 cat <<EOF
209 ################################################################
210 ## QUEUEING SYSTEM SETTINGS:
211 HOME=$HOME
212 USER=$USER
213 JOB_ID=$JOB_ID
214 JOB_NAME=$JOB_NAME
215 HOSTNAME=$HOSTNAME
216 TASK_ID=$TASK_ID
217 QUEUE=$QUEUE
218 
219 EOF
220 echo "######## Environment Variables ##########"
221 env
222 echo "################################################################"
223 TOPWORKDIR=/scratch/`whoami`
224 JOBDIR=sgejob-$JOB_ID
225 WORKDIR=$TOPWORKDIR/$JOBDIR
226 SUBMISIONDIR={jdir}
227 if test -e "$WORKDIR"; then
228  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
229  exit 1
230 fi
231 mkdir -p $WORKDIR
232 if test ! -d "$WORKDIR"; then
233  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
234  exit 1
235 fi
236 
237 #source $VO_CMS_SW_DIR/cmsset_default.sh
238 source {vo}/cmsset_default.sh
239 export SCRAM_ARCH=slc6_amd64_gcc481
240 #cd $CMSSW_BASE/src
241 cd {cmssw}/src
242 shopt -s expand_aliases
243 cmsenv
244 cd $WORKDIR
245 cp -rf $SUBMISIONDIR .
246 ls
247 cd `find . -type d | grep /`
248 echo 'running'
249 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
250 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
251 echo
252 {copy}
253 ###########################################################################
254 DATE_END=`date +%s`
255 RUNTIME=$((DATE_END-DATE_START))
256 echo "################################################################"
257 echo "Job finished at " `date`
258 echo "Wallclock running time: $RUNTIME s"
259 exit 0
260 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
261 
262  return script
263 
264 def batchScriptIC(jobDir):
265  '''prepare a IC version of the batch script'''
266 
267 
268  cmssw_release = os.environ['CMSSW_BASE']
269  script = """#!/bin/bash
270 export X509_USER_PROXY=/home/hep/$USER/myproxy
271 source /vols/cms/grid/setup.sh
272 cd {jobdir}
273 cd {cmssw}/src
274 eval `scramv1 ru -sh`
275 cd -
276 echo 'running'
277 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
278 echo
279 echo 'sending the job directory back'
280 mv Loop/* ./ && rm -r Loop
281 """.format(jobdir = jobDir,cmssw = cmssw_release)
282  return script
283 
284 def batchScriptLocal( remoteDir, index ):
285  '''prepare a local version of the batch script, to run using nohup'''
286 
287  script = """#!/bin/bash
288 echo 'running'
289 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
290 echo
291 echo 'sending the job directory back'
292 mv Loop/* ./
293 """
294  return script
295 
296 
297 class MyBatchManager( BatchManager ):
298  '''Batch manager specific to cmsRun processes.'''
299 
300  def PrepareJobUser(self, jobDir, value ):
301  '''Prepare one job. This function is called by the base class.'''
302  print(value)
303  print(components[value])
304 
305  #prepare the batch script
306  scriptFileName = jobDir+'/batchScript.sh'
307  scriptFile = open(scriptFileName,'w')
308  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
309  mode = self.RunningMode(options.batch)
310  if mode == 'LXPLUS':
311  scriptFile.write( batchScriptCERN( jobDir, storeDir ) )
312  elif mode == 'PSI':
313  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
314  elif mode == 'LOCAL':
315  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
316  elif mode == 'PISA' :
317  scriptFile.write( batchScriptPISA( storeDir, value) )
318  elif mode == 'PADOVA' :
319  scriptFile.write( batchScriptPADOVA( value, jobDir) )
320  elif mode == 'IC':
321  scriptFile.write( batchScriptIC(jobDir) )
322  scriptFile.close()
323  os.system('chmod +x %s' % scriptFileName)
324 
325  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
326 # jobConfig = copy.deepcopy(config)
327 # jobConfig.components = [ components[value] ]
328  cfgFile = open(jobDir+'/config.pck','w')
329  pickle.dump( components[value] , cfgFile )
330  # pickle.dump( cfo, cfgFile )
331  cfgFile.close()
332  if hasattr(self,"heppyOptions_"):
333  optjsonfile = open(jobDir+'/options.json','w')
334  optjsonfile.write(json.dumps(self.heppyOptions_))
335  optjsonfile.close()
336 
337 if __name__ == '__main__':
338  batchManager = MyBatchManager()
339  batchManager.parser_.usage="""
340  %prog [options] <cfgFile>
341 
342  Run Colin's python analysis system on the batch.
343  Job splitting is determined by your configuration file.
344  """
345 
346  options, args = batchManager.ParseOptions()
347 
348  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
349  for opt in options.extraOptions:
350  if "=" in opt:
351  (key,val) = opt.split("=",1)
352  _heppyGlobalOptions[key] = val
353  else:
354  _heppyGlobalOptions[opt] = True
355  batchManager.heppyOptions_=_heppyGlobalOptions
356 
357  cfgFileName = args[0]
358 
359  handle = open(cfgFileName, 'r')
360  # import pdb; pdb.set_trace()
361  cfo = imp.load_source("pycfg", cfgFileName, handle)
362  config = cfo.config
363  handle.close()
364 
365  components = split( [comp for comp in config.components if len(comp.files)>0] )
366  listOfValues = list(range(0, len(components)))
367  listOfNames = [comp.name for comp in components]
368 
369  batchManager.PrepareJobs( listOfValues, listOfNames )
370  waitingTime = 0.1
371  batchManager.SubmitJobs( waitingTime )
372 
def batchScriptPADOVA(index, jobDir='./')
Definition: heppy_batch.py:17
def batchScriptIC(jobDir)
Definition: heppy_batch.py:264
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
Definition: heppy_batch.py:46
def batchScriptPSI(index, jobDir, remoteDir='')
Definition: heppy_batch.py:168
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def PrepareJobUser(self, jobDir, value)
Definition: heppy_batch.py:300
def batchScriptLocal(remoteDir, index)
Definition: heppy_batch.py:284
def batchScriptCERN(jobDir, remoteDir='')
Definition: heppy_batch.py:78