CMS 3D CMS Logo

batchmanager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from datetime import datetime
4 from optparse import OptionParser
5 
6 import sys
7 import os
8 import re
9 import pprint
10 import time
11 
12 import eostools as castortools
13 
15  """
16  This class manages batch jobs
17  Used in batch scripts
18  Colin Bernet 2008
19  """
20 
21  # constructor
22  # self is this
23  # parse batch manager options
24  def __init__(self):
25  self.DefineOptions()
26 
27 
28  def DefineOptions(self):
29  # define options and arguments ====================================
30  # how to add more doc to the help?
31  self.parser_ = OptionParser()
32  self.parser_.add_option("-o", "--output-dir", dest="outputDir",
33  help="Name of the local output directory for your jobs. This directory will be created automatically.",
34  default=None)
35  self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
36  help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
37  default=None)
38  self.parser_.add_option("-f", "--force", action="store_true",
39  dest="force", default=False,
40  help="Don't ask any questions, just over-write")
41  # this opt can be removed
42  self.parser_.add_option("-n", "--negate", action="store_true",
43  dest="negate", default=False,
44  help="create jobs, but does not submit the jobs.")
45  self.parser_.add_option("-b", "--batch", dest="batch",
46  help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
47  default="bsub -q 8nh < ./batchScript.sh")
48  self.parser_.add_option( "--option",
49  dest="extraOptions",
50  type="string",
51  action="append",
52  default=[],
53  help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
54 
55  def ParseOptions(self):
56  (self.options_,self.args_) = self.parser_.parse_args()
57  if self.options_.remoteCopy == None:
58  self.remoteOutputDir_ = ""
59  else:
60  # removing possible trailing slash
61  self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
62  if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
63  # overwriting protection to be improved
64  if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
65  ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
66  if ld_lib_path != "None":
67  os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path # to solve gfal conflict with CMSSW
68  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
69  outputDir = self.options_.outputDir
70  if outputDir==None:
71  today = datetime.today()
72  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
73  self.remoteOutputDir_+="/"+outputDir
74  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
75  if ld_lib_path != "None":
76  os.environ['LD_LIBRARY_PATH'] = ld_lib_path # back to original to avoid conflicts
77  else:
78  print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
79  print self.remoteOutputDir_, "not valid"
80  sys.exit(1)
81  else: # assume EOS
82  if not castortools.isLFN( self.remoteOutputDir_ ):
83  print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
84  print self.remoteOutputDir_
85  sys.exit(1)
86  self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
87  dirExist = castortools.isDirectory( self.remoteOutputDir_ )
88  # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
89  # dirExist = os.system( nsls )
90  if dirExist is False:
91  print 'creating ', self.remoteOutputDir_
92  if castortools.isEOSFile( self.remoteOutputDir_ ):
93  # the output directory is currently a file..
94  # need to remove it.
95  castortools.rm( self.remoteOutputDir_ )
96  castortools.createEOSDir( self.remoteOutputDir_ )
97  else:
98  # directory exists.
99  if self.options_.negate is False and self.options_.force is False:
100  #COLIN need to reimplement protectedRemove in eostools
101  raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
102  # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
103  # the user does not want to delete the root files
104 
106  self.ManageOutputDir()
107  return (self.options_, self.args_)
108 
109 
110  def PrepareJobs(self, listOfValues, listOfDirNames=None):
111  print 'PREPARING JOBS ======== '
112  self.listOfJobs_ = []
113 
114  if listOfDirNames is None:
115  for value in listOfValues:
116  self.PrepareJob( value )
117  else:
118  for value, name in zip( listOfValues, listOfDirNames):
119  self.PrepareJob( value, name )
120  print "list of jobs:"
121  pp = pprint.PrettyPrinter(indent=4)
122  pp.pprint( self.listOfJobs_)
123 
124 
125  # create output dir, if necessary
126  def ManageOutputDir( self ):
127 
128  #if the output dir is not specified, generate a name
129  #else
130  #test if the directory exists
131  #if yes, returns
132 
133  outputDir = self.options_.outputDir
134 
135  if outputDir==None:
136  today = datetime.today()
137  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
138  print 'output directory not specified, using %s' % outputDir
139 
140  self.outputDir_ = os.path.abspath(outputDir)
141 
142  if( os.path.isdir(self.outputDir_) == True ):
143  input = ''
144  if not self.options_.force:
145  while input != 'y' and input != 'n':
146  input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
147  if input == 'n':
148  sys.exit(1)
149  else:
150  os.system( 'rm -rf ' + self.outputDir_)
151 
152  self.mkdir( self.outputDir_ )
153 
154 
155  def PrepareJob( self, value, dirname=None):
156  '''Prepare a job for a given value.
157 
158  calls PrepareJobUser, which should be overloaded by the user.
159  '''
160  print 'PrepareJob : %s' % value
161  dname = dirname
162  if dname is None:
163  dname = 'Job_{value}'.format( value=value )
164  jobDir = '/'.join( [self.outputDir_, dname])
165  print '\t',jobDir
166  self.mkdir( jobDir )
167  self.listOfJobs_.append( jobDir )
168  self.PrepareJobUser( jobDir, value )
169 
170  def PrepareJobUser(self, value ):
171  '''Hook allowing user to define how one of his jobs should be prepared.'''
172  print '\to be customized'
173 
174 
175  def SubmitJobs( self, waitingTimeInSec=0 ):
176  '''Submit all jobs. Possibly wait between each job'''
177 
178  if(self.options_.negate):
179  print '*NOT* SUBMITTING JOBS - exit '
180  return
181  print 'SUBMITTING JOBS ======== '
182  for jobDir in self.listOfJobs_:
183  root = os.getcwd()
184  # run it
185  print 'processing ', jobDir
186  os.chdir( jobDir )
187  self.SubmitJob( jobDir )
188  # and come back
189  os.chdir(root)
190  print 'waiting %s seconds...' % waitingTimeInSec
191  time.sleep( waitingTimeInSec )
192  print 'done.'
193 
194  def SubmitJob( self, jobDir ):
195  '''Hook for job submission.'''
196  print 'submitting (to be customized): ', jobDir
197  os.system( self.options_.batch )
198 
199 
200  def SubmitJobArray( self, numbOfJobs = 1 ):
201  '''Hook for array job submission.'''
202  print 'Submitting array with %s jobs' % numbOfJobs
203 
204  def CheckBatchScript( self, batchScript ):
205 
206  if batchScript == '':
207  return
208 
209  if( os.path.isfile(batchScript)== False ):
210  print 'file ',batchScript,' does not exist'
211  sys.exit(3)
212 
213  try:
214  ifile = open(batchScript)
215  except:
216  print 'cannot open input %s' % batchScript
217  sys.exit(3)
218  else:
219  for line in ifile:
220  p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
221  m=p.match(line)
222  if m:
223  if os.path.isdir( os.path.expandvars(m.group(1)) ):
224  print 'output directory ', m.group(1), 'already exists!'
225  print 'exiting'
226  sys.exit(2)
227  else:
228  if self.options_.negate==False:
229  os.mkdir( os.path.expandvars(m.group(1)) )
230  else:
231  print 'not making dir', self.options_.negate
232 
233  # create a directory
234  def mkdir( self, dirname ):
235  # there is probably a command for this in python
236  mkdir = 'mkdir -p %s' % dirname
237  ret = os.system( mkdir )
238  if( ret != 0 ):
239  print 'please remove or rename directory: ', dirname
240  sys.exit(4)
241 
242 
243  def RunningMode(self, batch):
244 
245  '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
246 
247  "LXPLUS" : batch command is bsub, and logged on lxplus
248  "PSI" : batch command is qsub, and logged to t3uiXX
249  "NAF" : batch command is qsub, and logged on naf
250  "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk
251  "LOCAL" : batch command is nohup.
252 
253  In all other cases, a CmsBatchException is raised
254  '''
255 
256  hostName = os.environ['HOSTNAME']
257 
258  onLxplus = hostName.startswith('lxplus')
259  onPSI = hostName.startswith('t3ui')
260  onNAF = hostName.startswith('naf')
261 
262  batchCmd = batch.split()[0]
263 
264  if batchCmd == 'bsub':
265  if not onLxplus:
266  err = 'Cannot run %s on %s' % (batchCmd, hostName)
267  raise ValueError( err )
268  else:
269  print 'running on LSF : %s from %s' % (batchCmd, hostName)
270  return 'LXPLUS'
271 
272  elif batchCmd == "qsub":
273  if onPSI:
274  print 'running on SGE : %s from %s' % (batchCmd, hostName)
275  return 'PSI'
276  elif onNAF:
277  print 'running on NAF : %s from %s' % (batchCmd, hostName)
278  return 'NAF'
279  elif onIC:
280  print 'running on IC : %s from %s' % (batchCmd, hostName)
281  return 'IC'
282  else:
283  err = 'Cannot run %s on %s' % (batchCmd, hostName)
284  raise ValueError( err )
285 
286  elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
287  print 'running locally : %s on %s' % (batchCmd, hostName)
288  return 'LOCAL'
289  else:
290  err = 'unknown batch command: X%sX' % batchCmd
291  raise ValueError( err )
def mkdir(self, dirname)
def PrepareJob(self, value, dirname=None)
def SubmitJobArray(self, numbOfJobs=1)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def PrepareJobs(self, listOfValues, listOfDirNames=None)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def CheckBatchScript(self, batchScript)
if(dp >Float(M_PI)) dp-
def SubmitJobs(self, waitingTimeInSec=0)
def SubmitJob(self, jobDir)
def RunningMode(self, batch)
def PrepareJobUser(self, value)