CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
batchmanager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from datetime import datetime
4 from optparse import OptionParser
5 
6 import sys
7 import os
8 import re
9 import pprint
10 import time
11 
12 
14  """
15  This class manages batch jobs
16  Used in batch scripts
17  Colin Bernet 2008
18  """
19 
20  # constructor
21  # self is this
22  # parse batch manager options
23  def __init__(self):
24  self.DefineOptions()
25 
26 
27  def DefineOptions(self):
28  # define options and arguments ====================================
29  # how to add more doc to the help?
30  self.parser_ = OptionParser()
31  self.parser_.add_option("-o", "--output-dir", dest="outputDir",
32  help="Name of the local output directory for your jobs. This directory will be created automatically.",
33  default=None)
34  self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
35  help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory is tarred and compressed into Logger.tgz, and sent to the remote output directory as well. Afterwards, use logger.py to access the information contained in Logger.tgz. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Logs will be sent back to the submision directory.",
36  default=None)
37  self.parser_.add_option("-f", "--force", action="store_true",
38  dest="force", default=False,
39  help="Don't ask any questions, just over-write")
40  # this opt can be removed
41  self.parser_.add_option("-n", "--negate", action="store_true",
42  dest="negate", default=False,
43  help="create jobs, but does not submit the jobs.")
44  self.parser_.add_option("-b", "--batch", dest="batch",
45  help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
46  default="bsub -q 8nh < ./batchScript.sh")
47 
48 
49  def ParseOptions(self):
50  (self.options_,self.args_) = self.parser_.parse_args()
51  if self.options_.remoteCopy == None:
52  self.remoteOutputDir_ = ""
53  else:
54  # removing possible trailing slash
55  import CMGTools.Production.eostools as castortools
56  self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
57 
58  if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
59  # overwriting protection to be improved
60  if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
61  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
62  outputDir = self.options_.outputDir
63  if outputDir==None:
64  today = datetime.today()
65  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
66  self.remoteOutputDir_+="/"+outputDir
67  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
68  else:
69  print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
70  print self.remoteOutputDir_, "not valid"
71  sys.exit(1)
72  else: # assume EOS
73  if not castortools.isLFN( self.remoteOutputDir_ ):
74  print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
75  print self.remoteOutputDir_
76  sys.exit(1)
77  self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
78  dirExist = castortools.isDirectory( self.remoteOutputDir_ )
79  # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
80  # dirExist = os.system( nsls )
81  if dirExist is False:
82  print 'creating ', self.remoteOutputDir_
83  if castortools.isEOSFile( self.remoteOutputDir_ ):
84  # the output directory is currently a file..
85  # need to remove it.
86  castortools.rm( self.remoteOutputDir_ )
87  castortools.createEOSDir( self.remoteOutputDir_ )
88  else:
89  # directory exists.
90  if self.options_.negate is False and self.options_.force is False:
91  #COLIN need to reimplement protectedRemove in eostools
92  raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
93  # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
94  # the user does not want to delete the root files
96  self.ManageOutputDir()
97  return (self.options_, self.args_)
98 
99 
100  def PrepareJobs(self, listOfValues, listOfDirNames=None):
101  print 'PREPARING JOBS ======== '
102  self.listOfJobs_ = []
103 
104  if listOfDirNames is None:
105  for value in listOfValues:
106  self.PrepareJob( value )
107  else:
108  for value, name in zip( listOfValues, listOfDirNames):
109  self.PrepareJob( value, name )
110  print "list of jobs:"
111  pp = pprint.PrettyPrinter(indent=4)
112  pp.pprint( self.listOfJobs_)
113 
114 
115  # create output dir, if necessary
116  def ManageOutputDir( self ):
117 
118  #if the output dir is not specified, generate a name
119  #else
120  #test if the directory exists
121  #if yes, returns
122 
123  outputDir = self.options_.outputDir
124 
125  if outputDir==None:
126  today = datetime.today()
127  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
128  print 'output directory not specified, using %s' % outputDir
129 
130  self.outputDir_ = os.path.abspath(outputDir)
131 
132  if( os.path.isdir(self.outputDir_) == True ):
133  input = ''
134  if not self.options_.force:
135  while input != 'y' and input != 'n':
136  input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n]' )
137  if input == 'n':
138  sys.exit(1)
139  else:
140  os.system( 'rm -rf ' + self.outputDir_)
141 
142  self.mkdir( self.outputDir_ )
143 
144 
145  def PrepareJob( self, value, dirname=None):
146  '''Prepare a job for a given value.
147 
148  calls PrepareJobUser, which should be overloaded by the user.
149  '''
150  print 'PrepareJob : %s' % value
151  dname = dirname
152  if dname is None:
153  dname = 'Job_{value}'.format( value=value )
154  jobDir = '/'.join( [self.outputDir_, dname])
155  print '\t',jobDir
156  self.mkdir( jobDir )
157  self.listOfJobs_.append( jobDir )
158  self.PrepareJobUser( jobDir, value )
159 
160  def PrepareJobUser(self, value ):
161  '''Hook allowing user to define how one of his jobs should be prepared.'''
162  print '\to be customized'
163 
164 
165  def SubmitJobs( self, waitingTimeInSec=0 ):
166  '''Submit all jobs. Possibly wait between each job'''
167 
168  if(self.options_.negate):
169  print '*NOT* SUBMITTING JOBS - exit '
170  return
171  print 'SUBMITTING JOBS ======== '
172  for jobDir in self.listOfJobs_:
173  root = os.getcwd()
174  # run it
175  print 'processing ', jobDir
176  os.chdir( jobDir )
177  self.SubmitJob( jobDir )
178  # and come back
179  os.chdir(root)
180  print 'waiting %s seconds...' % waitingTimeInSec
181  time.sleep( waitingTimeInSec )
182  print 'done.'
183 
184  def SubmitJob( self, jobDir ):
185  '''Hook for job submission.'''
186  print 'submitting (to be customized): ', jobDir
187  os.system( self.options_.batch )
188 
189 
190  def CheckBatchScript( self, batchScript ):
191 
192  if batchScript == '':
193  return
194 
195  if( os.path.isfile(batchScript)== False ):
196  print 'file ',batchScript,' does not exist'
197  sys.exit(3)
198 
199  try:
200  ifile = open(batchScript)
201  except:
202  print 'cannot open input %s' % batchScript
203  sys.exit(3)
204  else:
205  for line in ifile:
206  p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
207  m=p.match(line)
208  if m:
209  if os.path.isdir( os.path.expandvars(m.group(1)) ):
210  print 'output directory ', m.group(1), 'already exists!'
211  print 'exiting'
212  sys.exit(2)
213  else:
214  if self.options_.negate==False:
215  os.mkdir( os.path.expandvars(m.group(1)) )
216  else:
217  print 'not making dir', self.options_.negate
218 
219  # create a directory
220  def mkdir( self, dirname ):
221  # there is probably a command for this in python
222  mkdir = 'mkdir -p %s' % dirname
223  ret = os.system( mkdir )
224  if( ret != 0 ):
225  print 'please remove or rename directory: ', dirname
226  sys.exit(4)
227 
228 
229  def RunningMode(self, batch):
230  '''Returns "LXPLUS", "PSI", "LOCAL", or None,
231 
232  "LXPLUS" : batch command is bsub, and logged on lxplus
233  "PSI" : batch command is qsub, and logged to t3uiXX
234  "LOCAL" : batch command is nohup.
235  In all other cases, a CmsBatchException is raised
236  '''
237 
238  hostName = os.environ['HOSTNAME']
239  onLxplus = hostName.startswith('lxplus')
240  onPSI = hostName.startswith('t3ui' )
241  onPISA = re.match('.*gridui.*',hostName) or re.match('.*faiwn.*',hostName)
242  batchCmd = batch.split()[0]
243 
244  if batchCmd == 'bsub':
245  if not (onLxplus or onPISA) :
246  err = 'Cannot run %s on %s' % (batchCmd, hostName)
247  raise ValueError( err )
248  elif onPISA :
249  print 'running on LSF pisa : %s from %s' % (batchCmd, hostName)
250  return 'PISA'
251  else:
252  print 'running on LSF lxplus: %s from %s' % (batchCmd, hostName)
253  return 'LXPLUS'
254  elif batchCmd == "qsub":
255  if not onPSI:
256  err = 'Cannot run %s on %s' % (batchCmd, hostName)
257  raise ValueError( err )
258  else:
259  print 'running on SGE : %s from %s' % (batchCmd, hostName)
260  return 'PSI'
261  elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
262  print 'running locally : %s on %s' % (batchCmd, hostName)
263  return 'LOCAL'
264  else:
265  err = 'unknown batch command: X%sX' % batchCmd
266  raise ValueError( err )
tuple zip
Definition: archive.py:476
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
if(conf.exists("allCellsPositionCalc"))