CMS 3D CMS Logo

heppy_batch.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 from __future__ import print_function
4 from builtins import range
5 import sys
6 import imp
7 import copy
8 import os
9 import shutil
10 import pickle
11 import json
12 import math
13 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
14 
15 from PhysicsTools.HeppyCore.framework.heppy_loop import split
16 
17 def batchScriptPADOVA( index, jobDir='./'):
18  '''prepare the LSF version of the batch script, to run on LSF'''
19  script = """#!/bin/bash
20 #BSUB -q local
21 #BSUB -J test
22 #BSUB -o test.log
23 cd {jdir}
24 echo 'PWD:'
25 pwd
26 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
27 source $VO_CMS_SW_DIR/cmsset_default.sh
28 echo 'environment:'
29 echo
30 env > local.env
31 env
32 # ulimit -v 3000000 # NO
33 echo 'copying job dir to worker'
34 eval `scram runtime -sh`
35 ls
36 echo 'running'
37 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
38 exit $?
39 #echo
40 #echo 'sending the job directory back'
41 #echo cp -r Loop/* $LS_SUBCWD
42 """.format(jdir=jobDir)
43 
44  return script
45 
46 def batchScriptPISA( index, remoteDir=''):
47  '''prepare the LSF version of the batch script, to run on LSF'''
48  script = """#!/bin/bash
49 #BSUB -q cms
50 echo 'PWD:'
51 pwd
52 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
53 source $VO_CMS_SW_DIR/cmsset_default.sh
54 echo 'environment:'
55 echo
56 env > local.env
57 env
58 # ulimit -v 3000000 # NO
59 echo 'copying job dir to worker'
60 ###cd $CMSSW_BASE/src
61 eval `scramv1 runtime -sh`
62 #eval `scramv1 ru -sh`
63 # cd $LS_SUBCWD
64 # eval `scramv1 ru -sh`
65 ##cd -
66 ##cp -rf $LS_SUBCWD .
67 ls
68 echo `find . -type d | grep /`
69 echo 'running'
70 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
71 exit $?
72 #echo
73 #echo 'sending the job directory back'
74 #echo cp -r Loop/* $LS_SUBCWD
75 """
76  return script
77 
78 def batchScriptCERN( jobDir, remoteDir=''):
79  '''prepare the LSF version of the batch script, to run on LSF'''
80 
81  dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
82 rm Loop/cmsswPreProcessing.root
83 cp -r Loop/* $LS_SUBCWD
84 if [ $? -ne 0 ]; then
85  echo 'ERROR: problem copying job directory back'
86 else
87  echo 'job directory copy succeeded'
88 fi"""
89 
90  if remoteDir=='':
91  cpCmd=dirCopy
92  elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
93  cpCmd="""echo 'sending root files to remote dir'
94 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
95 for f in Loop/*/tree*.root
96 do
97  rm Loop/cmsswPreProcessing.root
98  ff=`echo $f | cut -d/ -f2`
99  ff="${{ff}}_`basename $f | cut -d . -f 1`"
100  echo $f
101  echo $ff
102  export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
103  source $VO_CMS_SW_DIR/cmsset_default.sh
104  for try in `seq 1 3`; do
105  echo "Stageout try $try"
106  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
107  /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
108  echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
109  /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
110  if [ $? -ne 0 ]; then
111  echo "ERROR: remote copy failed for file $ff"
112  continue
113  fi
114  echo "remote copy succeeded"
115  remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
116  locsize=$(cat `pwd`/$f | wc -c)
117  ok=$(($remsize==$locsize))
118  if [ $ok -ne 1 ]; then
119  echo "Problem with copy (file sizes don't match), will retry in 30s"
120  sleep 30
121  continue
122  fi
123  echo "everything ok"
124  rm $f
125  echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
126  break
127  done
128 done
129 cp -r Loop/* $LS_SUBCWD
130 if [ $? -ne 0 ]; then
131  echo 'ERROR: problem copying job directory back'
132 else
133  echo 'job directory copy succeeded'
134 fi
135 """.format(
136  idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
137  srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
138  )
139  else:
140  print("chosen location not supported yet: ", remoteDir)
141  print('path must start with /store/')
142  sys.exit(1)
143 
144  script = """#!/bin/bash
145 #BSUB -q 8nm
146 echo 'environment:'
147 echo
148 env | sort
149 # ulimit -v 3000000 # NO
150 echo 'copying job dir to worker'
151 cd $CMSSW_BASE/src
152 eval `scramv1 ru -sh`
153 # cd $LS_SUBCWD
154 # eval `scramv1 ru -sh`
155 cd -
156 cp -rf $LS_SUBCWD .
157 ls
158 cd `find . -type d | grep /`
159 echo 'running'
160 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
161 echo
162 {copy}
163 """.format(copy=cpCmd)
164 
165  return script
166 
167 
168 def batchScriptPSI( index, jobDir, remoteDir=''):
169  '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
170 
171  cmssw_release = os.environ['CMSSW_BASE']
172  VO_CMS_SW_DIR = "/swshare/cms" # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
173 
174  if remoteDir=='':
175  cpCmd="""echo 'sending the job directory back'
176 rm Loop/cmsswPreProcessing.root
177 cp -r Loop/* $SUBMISIONDIR"""
178  elif remoteDir.startswith("/pnfs/psi.ch"):
179  cpCmd="""echo 'sending root files to remote dir'
180 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
181 for f in Loop/mt2*.root
182 do
183  ff=`basename $f | cut -d . -f 1`
184  #d=`echo $f | cut -d / -f 2`
185  gfal-mkdir {srm}
186  echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
187  gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
188  if [ $? -ne 0 ]; then
189  echo "ERROR: remote copy failed for file $ff"
190  else
191  echo "remote copy succeeded"
192  rm Loop/$ff.root
193  fi
194 done
195 rm Loop/cmsswPreProcessing.root
196 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
197  else:
198  print("remote directory not supported yet: ", remoteDir)
199  print('path must start with "/pnfs/psi.ch"')
200  sys.exit(1)
201 
202 
203  script = """#!/bin/bash
204 shopt expand_aliases
205 ##### MONITORING/DEBUG INFORMATION ###############################
206 DATE_START=`date +%s`
207 echo "Job started at " `date`
208 cat <<EOF
209 ################################################################
210 ## QUEUEING SYSTEM SETTINGS:
211 HOME=$HOME
212 USER=$USER
213 JOB_ID=$JOB_ID
214 JOB_NAME=$JOB_NAME
215 HOSTNAME=$HOSTNAME
216 TASK_ID=$TASK_ID
217 QUEUE=$QUEUE
218 
219 EOF
220 echo "######## Environment Variables ##########"
221 env
222 echo "################################################################"
223 TOPWORKDIR=/scratch/`whoami`
224 JOBDIR=sgejob-$JOB_ID
225 WORKDIR=$TOPWORKDIR/$JOBDIR
226 SUBMISIONDIR={jdir}
227 if test -e "$WORKDIR"; then
228  echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
229  exit 1
230 fi
231 mkdir -p $WORKDIR
232 if test ! -d "$WORKDIR"; then
233  echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
234  exit 1
235 fi
236 
237 #source $VO_CMS_SW_DIR/cmsset_default.sh
238 source {vo}/cmsset_default.sh
239 export SCRAM_ARCH=slc6_amd64_gcc481
240 #cd $CMSSW_BASE/src
241 cd {cmssw}/src
242 shopt -s expand_aliases
243 cmsenv
244 cd $WORKDIR
245 cp -rf $SUBMISIONDIR .
246 ls
247 cd `find . -type d | grep /`
248 echo 'running'
249 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
250 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
251 echo
252 {copy}
253 ###########################################################################
254 DATE_END=`date +%s`
255 RUNTIME=$((DATE_END-DATE_START))
256 echo "################################################################"
257 echo "Job finished at " `date`
258 echo "Wallclock running time: $RUNTIME s"
259 exit 0
260 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
261 
262  return script
263 
264 def batchScriptIC(jobDir):
265  '''prepare a IC version of the batch script'''
266 
267 
268  cmssw_release = os.environ['CMSSW_BASE']
269  script = """#!/bin/bash
270 export X509_USER_PROXY=/home/hep/$USER/myproxy
271 source /vols/cms/grid/setup.sh
272 cd {jobdir}
273 cd {cmssw}/src
274 eval `scramv1 ru -sh`
275 cd -
276 echo 'running'
277 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
278 echo
279 echo 'sending the job directory back'
280 mv Loop/* ./ && rm -r Loop
281 """.format(jobdir = jobDir,cmssw = cmssw_release)
282  return script
283 
284 def batchScriptLocal( remoteDir, index ):
285  '''prepare a local version of the batch script, to run using nohup'''
286 
287  script = """#!/bin/bash
288 echo 'running'
289 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
290 echo
291 echo 'sending the job directory back'
292 mv Loop/* ./
293 """
294  return script
295 
296 
297 class MyBatchManager( BatchManager ):
298  '''Batch manager specific to cmsRun processes.'''
299 
300  def PrepareJobUser(self, jobDir, value ):
301  '''Prepare one job. This function is called by the base class.'''
302  print(value)
303  print(components[value])
304 
305  #prepare the batch script
306  scriptFileName = jobDir+'/batchScript.sh'
307  scriptFile = open(scriptFileName,'w')
308  storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
309  mode = self.RunningMode(options.batch)
310  if mode == 'LXPLUS':
311  scriptFile.write( batchScriptCERN( jobDir, storeDir ) )
312  elif mode == 'PSI':
313  scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
314  elif mode == 'LOCAL':
315  scriptFile.write( batchScriptLocal( storeDir, value) ) # watch out arguments are swapped (although not used)
316  elif mode == 'PISA' :
317  scriptFile.write( batchScriptPISA( storeDir, value) )
318  elif mode == 'PADOVA' :
319  scriptFile.write( batchScriptPADOVA( value, jobDir) )
320  elif mode == 'IC':
321  scriptFile.write( batchScriptIC(jobDir) )
322  scriptFile.close()
323  os.system('chmod +x %s' % scriptFileName)
324 
325  shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
326 # jobConfig = copy.deepcopy(config)
327 # jobConfig.components = [ components[value] ]
328  cfgFile = open(jobDir+'/config.pck','w')
329  pickle.dump( components[value] , cfgFile )
330  # pickle.dump( cfo, cfgFile )
331  cfgFile.close()
332  if hasattr(self,"heppyOptions_"):
333  optjsonfile = open(jobDir+'/options.json','w')
334  optjsonfile.write(json.dumps(self.heppyOptions_))
335  optjsonfile.close()
336 
337 if __name__ == '__main__':
338  batchManager = MyBatchManager()
339  batchManager.parser_.usage="""
340  %prog [options] <cfgFile>
341 
342  Run Colin's python analysis system on the batch.
343  Job splitting is determined by your configuration file.
344  """
345 
346  options, args = batchManager.ParseOptions()
347 
348  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
349  for opt in options.extraOptions:
350  if "=" in opt:
351  (key,val) = opt.split("=",1)
352  _heppyGlobalOptions[key] = val
353  else:
354  _heppyGlobalOptions[opt] = True
355  batchManager.heppyOptions_=_heppyGlobalOptions
356 
357  cfgFileName = args[0]
358 
359  handle = open(cfgFileName, 'r')
360  # import pdb; pdb.set_trace()
361  cfo = imp.load_source("pycfg", cfgFileName, handle)
362  config = cfo.config
363  handle.close()
364 
365  components = split( [comp for comp in config.components if len(comp.files)>0] )
366  listOfValues = list(range(0, len(components)))
367  listOfNames = [comp.name for comp in components]
368 
369  batchManager.PrepareJobs( listOfValues, listOfNames )
370  waitingTime = 0.1
371  batchManager.SubmitJobs( waitingTime )
372 
def batchScriptPADOVA(index, jobDir='./')
Definition: heppy_batch.py:17
def batchScriptIC(jobDir)
Definition: heppy_batch.py:264
def replace(string, replacements)
def batchScriptPISA(index, remoteDir='')
Definition: heppy_batch.py:46
def batchScriptPSI(index, jobDir, remoteDir='')
Definition: heppy_batch.py:168
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def PrepareJobUser(self, jobDir, value)
Definition: heppy_batch.py:300
def batchScriptLocal(remoteDir, index)
Definition: heppy_batch.py:284
double split
Definition: MVATrainer.cc:139
def batchScriptCERN(jobDir, remoteDir='')
Definition: heppy_batch.py:78
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run