CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
batchmanager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from datetime import datetime
4 from optparse import OptionParser
5 
6 import sys
7 import os
8 import re
9 import pprint
10 import time
11 
12 import eostools as castortools
13 
15  """
16  This class manages batch jobs
17  Used in batch scripts
18  Colin Bernet 2008
19  """
20 
21  # constructor
22  # self is this
23  # parse batch manager options
24  def __init__(self):
25  self.DefineOptions()
26 
27 
28  def DefineOptions(self):
29  # define options and arguments ====================================
30  # how to add more doc to the help?
31  self.parser_ = OptionParser()
32  self.parser_.add_option("-o", "--output-dir", dest="outputDir",
33  help="Name of the local output directory for your jobs. This directory will be created automatically.",
34  default=None)
35  self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
36  help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
37  default=None)
38  self.parser_.add_option("-f", "--force", action="store_true",
39  dest="force", default=False,
40  help="Don't ask any questions, just over-write")
41  # this opt can be removed
42  self.parser_.add_option("-n", "--negate", action="store_true",
43  dest="negate", default=False,
44  help="create jobs, but does not submit the jobs.")
45  self.parser_.add_option("-b", "--batch", dest="batch",
46  help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
47  default="bsub -q 8nh < ./batchScript.sh")
48 
49  def ParseOptions(self):
50  (self.options_,self.args_) = self.parser_.parse_args()
51  if self.options_.remoteCopy == None:
52  self.remoteOutputDir_ = ""
53  else:
54  # removing possible trailing slash
55  self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
56  if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
57  # overwriting protection to be improved
58  if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
59  ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
60  if ld_lib_path != "None":
61  os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path # to solve gfal conflict with CMSSW
62  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
63  outputDir = self.options_.outputDir
64  if outputDir==None:
65  today = datetime.today()
66  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
67  self.remoteOutputDir_+="/"+outputDir
68  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
69  if ld_lib_path != "None":
70  os.environ['LD_LIBRARY_PATH'] = ld_lib_path # back to original to avoid conflicts
71  else:
72  print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
73  print self.remoteOutputDir_, "not valid"
74  sys.exit(1)
75  else: # assume EOS
76  if not castortools.isLFN( self.remoteOutputDir_ ):
77  print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
78  print self.remoteOutputDir_
79  sys.exit(1)
80  self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
81  dirExist = castortools.isDirectory( self.remoteOutputDir_ )
82  # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
83  # dirExist = os.system( nsls )
84  if dirExist is False:
85  print 'creating ', self.remoteOutputDir_
86  if castortools.isEOSFile( self.remoteOutputDir_ ):
87  # the output directory is currently a file..
88  # need to remove it.
89  castortools.rm( self.remoteOutputDir_ )
90  castortools.createEOSDir( self.remoteOutputDir_ )
91  else:
92  # directory exists.
93  if self.options_.negate is False and self.options_.force is False:
94  #COLIN need to reimplement protectedRemove in eostools
95  raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
96  # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
97  # the user does not want to delete the root files
98 
100  self.ManageOutputDir()
101  return (self.options_, self.args_)
102 
103 
104  def PrepareJobs(self, listOfValues, listOfDirNames=None):
105  print 'PREPARING JOBS ======== '
106  self.listOfJobs_ = []
107 
108  if listOfDirNames is None:
109  for value in listOfValues:
110  self.PrepareJob( value )
111  else:
112  for value, name in zip( listOfValues, listOfDirNames):
113  self.PrepareJob( value, name )
114  print "list of jobs:"
115  pp = pprint.PrettyPrinter(indent=4)
116  pp.pprint( self.listOfJobs_)
117 
118 
119  # create output dir, if necessary
120  def ManageOutputDir( self ):
121 
122  #if the output dir is not specified, generate a name
123  #else
124  #test if the directory exists
125  #if yes, returns
126 
127  outputDir = self.options_.outputDir
128 
129  if outputDir==None:
130  today = datetime.today()
131  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
132  print 'output directory not specified, using %s' % outputDir
133 
134  self.outputDir_ = os.path.abspath(outputDir)
135 
136  if( os.path.isdir(self.outputDir_) == True ):
137  input = ''
138  if not self.options_.force:
139  while input != 'y' and input != 'n':
140  input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
141  if input == 'n':
142  sys.exit(1)
143  else:
144  os.system( 'rm -rf ' + self.outputDir_)
145 
146  self.mkdir( self.outputDir_ )
147 
148 
149  def PrepareJob( self, value, dirname=None):
150  '''Prepare a job for a given value.
151 
152  calls PrepareJobUser, which should be overloaded by the user.
153  '''
154  print 'PrepareJob : %s' % value
155  dname = dirname
156  if dname is None:
157  dname = 'Job_{value}'.format( value=value )
158  jobDir = '/'.join( [self.outputDir_, dname])
159  print '\t',jobDir
160  self.mkdir( jobDir )
161  self.listOfJobs_.append( jobDir )
162  self.PrepareJobUser( jobDir, value )
163 
164  def PrepareJobUser(self, value ):
165  '''Hook allowing user to define how one of his jobs should be prepared.'''
166  print '\to be customized'
167 
168 
169  def SubmitJobs( self, waitingTimeInSec=0 ):
170  '''Submit all jobs. Possibly wait between each job'''
171 
172  if(self.options_.negate):
173  print '*NOT* SUBMITTING JOBS - exit '
174  return
175  print 'SUBMITTING JOBS ======== '
176  for jobDir in self.listOfJobs_:
177  root = os.getcwd()
178  # run it
179  print 'processing ', jobDir
180  os.chdir( jobDir )
181  self.SubmitJob( jobDir )
182  # and come back
183  os.chdir(root)
184  print 'waiting %s seconds...' % waitingTimeInSec
185  time.sleep( waitingTimeInSec )
186  print 'done.'
187 
188  def SubmitJob( self, jobDir ):
189  '''Hook for job submission.'''
190  print 'submitting (to be customized): ', jobDir
191  os.system( self.options_.batch )
192 
193 
194  def SubmitJobArray( self, numbOfJobs = 1 ):
195  '''Hook for array job submission.'''
196  print 'Submitting array with %s jobs' % numbOfJobs
197 
198  def CheckBatchScript( self, batchScript ):
199 
200  if batchScript == '':
201  return
202 
203  if( os.path.isfile(batchScript)== False ):
204  print 'file ',batchScript,' does not exist'
205  sys.exit(3)
206 
207  try:
208  ifile = open(batchScript)
209  except:
210  print 'cannot open input %s' % batchScript
211  sys.exit(3)
212  else:
213  for line in ifile:
214  p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
215  m=p.match(line)
216  if m:
217  if os.path.isdir( os.path.expandvars(m.group(1)) ):
218  print 'output directory ', m.group(1), 'already exists!'
219  print 'exiting'
220  sys.exit(2)
221  else:
222  if self.options_.negate==False:
223  os.mkdir( os.path.expandvars(m.group(1)) )
224  else:
225  print 'not making dir', self.options_.negate
226 
227  # create a directory
228  def mkdir( self, dirname ):
229  # there is probably a command for this in python
230  mkdir = 'mkdir -p %s' % dirname
231  ret = os.system( mkdir )
232  if( ret != 0 ):
233  print 'please remove or rename directory: ', dirname
234  sys.exit(4)
235 
236 
237  def RunningMode(self, batch):
238 
239  '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
240 
241  "LXPLUS" : batch command is bsub, and logged on lxplus
242  "PSI" : batch command is qsub, and logged to t3uiXX
243  "NAF" : batch command is qsub, and logged on naf
244  "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk
245  "LOCAL" : batch command is nohup.
246 
247  In all other cases, a CmsBatchException is raised
248  '''
249 
250  hostName = os.environ['HOSTNAME']
251 
252  onLxplus = hostName.startswith('lxplus')
253  onPSI = hostName.startswith('t3ui')
254  onNAF = hostName.startswith('naf')
255 
256  batchCmd = batch.split()[0]
257 
258  if batchCmd == 'bsub':
259  if not onLxplus:
260  err = 'Cannot run %s on %s' % (batchCmd, hostName)
261  raise ValueError( err )
262  else:
263  print 'running on LSF : %s from %s' % (batchCmd, hostName)
264  return 'LXPLUS'
265 
266  elif batchCmd == "qsub":
267  if onPSI:
268  print 'running on SGE : %s from %s' % (batchCmd, hostName)
269  return 'PSI'
270  elif onNAF:
271  print 'running on NAF : %s from %s' % (batchCmd, hostName)
272  return 'NAF'
273  elif onIC:
274  print 'running on IC : %s from %s' % (batchCmd, hostName)
275  return 'IC'
276  else:
277  err = 'Cannot run %s on %s' % (batchCmd, hostName)
278  raise ValueError( err )
279 
280  elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
281  print 'running locally : %s on %s' % (batchCmd, hostName)
282  return 'LOCAL'
283  else:
284  err = 'unknown batch command: X%sX' % batchCmd
285  raise ValueError( err )
tuple zip
Definition: archive.py:476
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
if(conf.exists("allCellsPositionCalc"))