CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
batchmanager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from datetime import datetime
4 from optparse import OptionParser
5 
6 import sys
7 import os
8 import re
9 import pprint
10 import time
11 
12 
14  """
15  This class manages batch jobs
16  Used in batch scripts
17  Colin Bernet 2008
18  """
19 
20  # constructor
21  # self is this
22  # parse batch manager options
23  def __init__(self):
24  self.DefineOptions()
25 
26 
27  def DefineOptions(self):
28  # define options and arguments ====================================
29  # how to add more doc to the help?
30  self.parser_ = OptionParser()
31  self.parser_.add_option("-o", "--output-dir", dest="outputDir",
32  help="Name of the local output directory for your jobs. This directory will be created automatically.",
33  default=None)
34  self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
35  help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory is tarred and compressed into Logger.tgz, and sent to the remote output directory as well. Afterwards, use logger.py to access the information contained in Logger.tgz. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Logs will be sent back to the submision directory.",
36  default=None)
37  self.parser_.add_option("-f", "--force", action="store_true",
38  dest="force", default=False,
39  help="Don't ask any questions, just over-write")
40  # this opt can be removed
41  self.parser_.add_option("-n", "--negate", action="store_true",
42  dest="negate", default=False,
43  help="create jobs, but does not submit the jobs.")
44  self.parser_.add_option("-b", "--batch", dest="batch",
45  help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
46  default="bsub -q 8nh < ./batchScript.sh")
47 
48 
49  def ParseOptions(self):
50  (self.options_,self.args_) = self.parser_.parse_args()
51  if self.options_.remoteCopy == None:
52  self.remoteOutputDir_ = ""
53  else:
54  # removing possible trailing slash
55  import CMGTools.Production.eostools as castortools
56  self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
57 
58  if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
59  # overwriting protection to be improved
60  if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
61  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
62  outputDir = self.options_.outputDir
63  if outputDir==None:
64  today = datetime.today()
65  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
66  self.remoteOutputDir_+="/"+outputDir
67  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
68  else:
69  print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
70  print self.remoteOutputDir_, "not valid"
71  sys.exit(1)
72  else: # assume EOS
73  if not castortools.isLFN( self.remoteOutputDir_ ):
74  print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
75  print self.remoteOutputDir_
76  sys.exit(1)
77  self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
78  dirExist = castortools.isDirectory( self.remoteOutputDir_ )
79  # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
80  # dirExist = os.system( nsls )
81  if dirExist is False:
82  print 'creating ', self.remoteOutputDir_
83  if castortools.isEOSFile( self.remoteOutputDir_ ):
84  # the output directory is currently a file..
85  # need to remove it.
86  castortools.rm( self.remoteOutputDir_ )
87  castortools.createEOSDir( self.remoteOutputDir_ )
88  else:
89  # directory exists.
90  if self.options_.negate is False and self.options_.force is False:
91  #COLIN need to reimplement protectedRemove in eostools
92  raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
93  # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
94  # the user does not want to delete the root files
96  self.ManageOutputDir()
97  return (self.options_, self.args_)
98 
99 
100  def PrepareJobs(self, listOfValues, listOfDirNames=None):
101  print 'PREPARING JOBS ======== '
102  self.listOfJobs_ = []
103 
104  if listOfDirNames is None:
105  for value in listOfValues:
106  self.PrepareJob( value )
107  else:
108  for value, name in zip( listOfValues, listOfDirNames):
109  self.PrepareJob( value, name )
110  print "list of jobs:"
111  pp = pprint.PrettyPrinter(indent=4)
112  pp.pprint( self.listOfJobs_)
113 
114 
115  # create output dir, if necessary
116  def ManageOutputDir( self ):
117 
118  #if the output dir is not specified, generate a name
119  #else
120  #test if the directory exists
121  #if yes, returns
122 
123  outputDir = self.options_.outputDir
124 
125  if outputDir==None:
126  today = datetime.today()
127  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
128  print 'output directory not specified, using %s' % outputDir
129 
130  self.outputDir_ = os.path.abspath(outputDir)
131 
132  if( os.path.isdir(self.outputDir_) == True ):
133  input = ''
134  if not self.options_.force:
135  while input != 'y' and input != 'n':
136  input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n]' )
137  if input == 'n':
138  sys.exit(1)
139  else:
140  os.system( 'rm -rf ' + self.outputDir_)
141 
142  self.mkdir( self.outputDir_ )
143 
144 
145  def PrepareJob( self, value, dirname=None):
146  '''Prepare a job for a given value.
147 
148  calls PrepareJobUser, which should be overloaded by the user.
149  '''
150  print 'PrepareJob : %s' % value
151  dname = dirname
152  if dname is None:
153  dname = 'Job_{value}'.format( value=value )
154  jobDir = '/'.join( [self.outputDir_, dname])
155  print '\t',jobDir
156  self.mkdir( jobDir )
157  self.listOfJobs_.append( jobDir )
158  self.PrepareJobUser( jobDir, value )
159 
160  def PrepareJobUser(self, value ):
161  '''Hook allowing user to define how one of his jobs should be prepared.'''
162  print '\to be customized'
163 
164 
165  def SubmitJobs( self, waitingTimeInSec=0 ):
166  '''Submit all jobs. Possibly wait between each job'''
167 
168  if(self.options_.negate):
169  print '*NOT* SUBMITTING JOBS - exit '
170  return
171  print 'SUBMITTING JOBS ======== '
172  for jobDir in self.listOfJobs_:
173  root = os.getcwd()
174  # run it
175  print 'processing ', jobDir
176  os.chdir( jobDir )
177  self.SubmitJob( jobDir )
178  # and come back
179  os.chdir(root)
180  print 'waiting %s seconds...' % waitingTimeInSec
181  time.sleep( waitingTimeInSec )
182  print 'done.'
183 
184  def SubmitJob( self, jobDir ):
185  '''Hook for job submission.'''
186  print 'submitting (to be customized): ', jobDir
187  os.system( self.options_.batch )
188 
189 
190  def CheckBatchScript( self, batchScript ):
191 
192  if batchScript == '':
193  return
194 
195  if( os.path.isfile(batchScript)== False ):
196  print 'file ',batchScript,' does not exist'
197  sys.exit(3)
198 
199  try:
200  ifile = open(batchScript)
201  except:
202  print 'cannot open input %s' % batchScript
203  sys.exit(3)
204  else:
205  for line in ifile:
206  p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
207  m=p.match(line)
208  if m:
209  if os.path.isdir( os.path.expandvars(m.group(1)) ):
210  print 'output directory ', m.group(1), 'already exists!'
211  print 'exiting'
212  sys.exit(2)
213  else:
214  if self.options_.negate==False:
215  os.mkdir( os.path.expandvars(m.group(1)) )
216  else:
217  print 'not making dir', self.options_.negate
218 
219  # create a directory
220  def mkdir( self, dirname ):
221  # there is probably a command for this in python
222  mkdir = 'mkdir -p %s' % dirname
223  ret = os.system( mkdir )
224  if( ret != 0 ):
225  print 'please remove or rename directory: ', dirname
226  sys.exit(4)
227 
228 
229  def RunningMode(self, batch):
230  '''Returns "LXPLUS", "PSI", "LOCAL", or None,
231 
232  "LXPLUS" : batch command is bsub, and logged on lxplus
233  "PSI" : batch command is qsub, and logged to t3uiXX
234  "IC" : batch command is qsub, and logged to hep.ph.ic.ac.uk
235  "LOCAL" : batch command is nohup.
236  In all other cases, a CmsBatchException is raised
237  '''
238 
239  hostName = os.environ['HOSTNAME']
240  onLxplus = hostName.startswith('lxplus')
241  onPSI = hostName.startswith('t3ui' )
242  onPISA = re.match('.*gridui.*',hostName) or re.match('.*faiwn.*',hostName)
243  onIC = 'hep.ph.ic.ac.uk' in hostName
244  batchCmd = batch.split()[0]
245 
246  if batchCmd == 'bsub':
247  if not (onLxplus or onPISA) :
248  err = 'Cannot run %s on %s' % (batchCmd, hostName)
249  raise ValueError( err )
250  elif onPISA :
251  print 'running on LSF pisa : %s from %s' % (batchCmd, hostName)
252  return 'PISA'
253  else:
254  print 'running on LSF lxplus: %s from %s' % (batchCmd, hostName)
255  return 'LXPLUS'
256  elif batchCmd == "qsub":
257  #if not onPSI:
258  # err = 'Cannot run %s on %s' % (batchCmd, hostName)
259  # raise ValueError( err )
260 
261  if onIC:
262  print 'running on IC : %s from %s' % (batchCmd, hostName)
263  return 'IC'
264 
265  else:
266  if onPSI:
267  print 'running on SGE : %s from %s' % (batchCmd, hostName)
268  return 'PSI'
269 
270  elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
271  print 'running locally : %s on %s' % (batchCmd, hostName)
272  return 'LOCAL'
273  else:
274  err = 'unknown batch command: X%sX' % batchCmd
275  raise ValueError( err )
tuple zip
Definition: archive.py:476
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
if(conf.exists("allCellsPositionCalc"))