CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
batchmanager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from datetime import datetime
4 from optparse import OptionParser
5 
6 import sys
7 import os
8 import re
9 import pprint
10 import time
11 
12 
14  """
15  This class manages batch jobs
16  Used in batch scripts
17  Colin Bernet 2008
18  """
19 
20  # constructor
21  # self is this
22  # parse batch manager options
23  def __init__(self):
24  self.DefineOptions()
25 
26 
27  def DefineOptions(self):
28  # define options and arguments ====================================
29  # how to add more doc to the help?
30  self.parser_ = OptionParser()
31  self.parser_.add_option("-o", "--output-dir", dest="outputDir",
32  help="Name of the local output directory for your jobs. This directory will be created automatically.",
33  default=None)
34  self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
35  help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory is tarred and compressed into Logger.tgz, and sent to the remote output directory as well. Afterwards, use logger.py to access the information contained in Logger.tgz. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Logs will be sent back to the submision directory. NOTE: so far this option has been implemented and validated to work only for a remote copy to PSI",
36  default=None)
37  self.parser_.add_option("-f", "--force", action="store_true",
38  dest="force", default=False,
39  help="Don't ask any questions, just over-write")
40  # this opt can be removed
41  self.parser_.add_option("-n", "--negate", action="store_true",
42  dest="negate", default=False,
43  help="create jobs, but does not submit the jobs.")
44  self.parser_.add_option("-b", "--batch", dest="batch",
45  help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
46  default="bsub -q 8nh < ./batchScript.sh")
47  self.parser_.add_option("-p", "--parametric", action="store_true",
48  dest="parametric", default=False,
49  help="submit jobs parametrically, implemented for IC so far")
50 
51 
52  def ParseOptions(self):
53  (self.options_,self.args_) = self.parser_.parse_args()
54  if self.options_.remoteCopy == None:
55  self.remoteOutputDir_ = ""
56  else:
57  # removing possible trailing slash
58  import CMGTools.Production.eostools as castortools
59  self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
60 
61  if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
62  # overwriting protection to be improved
63  if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
64  ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
65  if ld_lib_path != "None":
66  os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path # to solve gfal conflict with CMSSW
67  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
68  outputDir = self.options_.outputDir.rstrip("/").split("/")[-1] # to for instance direct output to /afs/cern.ch/work/u/user/outputDir
69  if outputDir==None:
70  today = datetime.today()
71  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
72  self.remoteOutputDir_+="/"+outputDir
73  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
74  if ld_lib_path != "None":
75  os.environ['LD_LIBRARY_PATH'] = ld_lib_path # back to original to avoid conflicts
76  else:
77  print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
78  print self.remoteOutputDir_, "not valid"
79  sys.exit(1)
80  else: # assume EOS
81  if not castortools.isLFN( self.remoteOutputDir_ ):
82  print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
83  print self.remoteOutputDir_
84  sys.exit(1)
85  self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
86  dirExist = castortools.isDirectory( self.remoteOutputDir_ )
87  # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
88  # dirExist = os.system( nsls )
89  if dirExist is False:
90  print 'creating ', self.remoteOutputDir_
91  if castortools.isEOSFile( self.remoteOutputDir_ ):
92  # the output directory is currently a file..
93  # need to remove it.
94  castortools.rm( self.remoteOutputDir_ )
95  castortools.createEOSDir( self.remoteOutputDir_ )
96  else:
97  # directory exists.
98  if self.options_.negate is False and self.options_.force is False:
99  #COLIN need to reimplement protectedRemove in eostools
100  raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
101  # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
102  # the user does not want to delete the root files
104  self.ManageOutputDir()
105  return (self.options_, self.args_)
106 
107 
108  def PrepareJobs(self, listOfValues, listOfDirNames=None):
109  print 'PREPARING JOBS ======== '
110  self.listOfJobs_ = []
111 
112  if listOfDirNames is None:
113  for value in listOfValues:
114  self.PrepareJob( value )
115  else:
116  for value, name in zip( listOfValues, listOfDirNames):
117  self.PrepareJob( value, name )
118  print "list of jobs:"
119  pp = pprint.PrettyPrinter(indent=4)
120  pp.pprint( self.listOfJobs_)
121 
122 
123  # create output dir, if necessary
124  def ManageOutputDir( self ):
125 
126  #if the output dir is not specified, generate a name
127  #else
128  #test if the directory exists
129  #if yes, returns
130 
131  outputDir = self.options_.outputDir
132 
133  if outputDir==None:
134  today = datetime.today()
135  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
136  print 'output directory not specified, using %s' % outputDir
137 
138  self.outputDir_ = os.path.abspath(outputDir)
139 
140  if( os.path.isdir(self.outputDir_) == True ):
141  input = ''
142  if not self.options_.force:
143  while input != 'y' and input != 'n':
144  input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n]' )
145  if input == 'n':
146  sys.exit(1)
147  else:
148  os.system( 'rm -rf ' + self.outputDir_)
149 
150  self.mkdir( self.outputDir_ )
151 
152 
153  def PrepareJob( self, value, dirname=None):
154  '''Prepare a job for a given value.
155 
156  calls PrepareJobUser, which should be overloaded by the user.
157  '''
158  print 'PrepareJob : %s' % value
159  dname = dirname
160  if dname is None:
161  dname = 'Job_{value}'.format( value=value )
162  jobDir = '/'.join( [self.outputDir_, dname])
163  print '\t',jobDir
164  self.mkdir( jobDir )
165  self.listOfJobs_.append( jobDir )
166  self.PrepareJobUser( jobDir, value )
167 
168  def PrepareJobUser(self, value ):
169  '''Hook allowing user to define how one of his jobs should be prepared.'''
170  print '\to be customized'
171 
172 
173  def SubmitJobs( self, waitingTimeInSec=0 ):
174  '''Submit all jobs. Possibly wait between each job'''
175 
176  if(self.options_.negate):
177  print '*NOT* SUBMITTING JOBS - exit '
178  return
179  print 'SUBMITTING JOBS ======== '
180 
181  mode = self.RunningMode(self.options_.batch)
182 
183  # If at IC write all the job directories to a file then submit a parameteric
184  # job that depends on the file number. This is required to circumvent the 2000
185  # individual job limit at IC
186  if mode=="IC" and self.options_.parametric:
187 
188  jobDirsFile = os.path.join(self.outputDir_,"jobDirectories.txt")
189  with open(jobDirsFile, 'w') as f:
190  for jobDir in self.listOfJobs_:
191  print>>f,jobDir
192 
193  readLine = "readarray JOBDIR < "+jobDirsFile+"\n"
194 
195  submitScript = os.path.join(self.outputDir_,"parametricSubmit.sh")
196  with open(submitScript,'w') as batchScript:
197  batchScript.write("#!/bin/bash\n")
198  batchScript.write("#$ -e /dev/null -o /dev/null \n")
199  batchScript.write("cd "+self.outputDir_+"\n")
200  batchScript.write(readLine)
201  batchScript.write("cd ${JOBDIR[${SGE_TASK_ID}-1]}\n")
202  batchScript.write( "./batchScript.sh > BATCH_outputLog.txt 2> BATCH_errorLog.txt" )
203 
204  #Find the queue
205  splitBatchOptions = self.options_.batch.split()
206  if '-q' in splitBatchOptions: queue = splitBatchOptions[splitBatchOptions.index('-q')+1]
207  else: queue = "hepshort.q"
208 
209  os.system("qsub -q "+queue+" -t 1-"+str(len(self.listOfJobs_))+" "+submitScript)
210 
211  else:
212  #continue as before, submitting one job per directory
213 
214  for jobDir in self.listOfJobs_:
215  root = os.getcwd()
216  # run it
217  print 'processing ', jobDir
218  os.chdir( jobDir )
219  self.SubmitJob( jobDir )
220  # and come back
221  os.chdir(root)
222  print 'waiting %s seconds...' % waitingTimeInSec
223  time.sleep( waitingTimeInSec )
224  print 'done.'
225 
226  def SubmitJob( self, jobDir ):
227  '''Hook for job submission.'''
228  print 'submitting (to be customized): ', jobDir
229  os.system( self.options_.batch )
230 
231 
232  def CheckBatchScript( self, batchScript ):
233 
234  if batchScript == '':
235  return
236 
237  if( os.path.isfile(batchScript)== False ):
238  print 'file ',batchScript,' does not exist'
239  sys.exit(3)
240 
241  try:
242  ifile = open(batchScript)
243  except:
244  print 'cannot open input %s' % batchScript
245  sys.exit(3)
246  else:
247  for line in ifile:
248  p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
249  m=p.match(line)
250  if m:
251  if os.path.isdir( os.path.expandvars(m.group(1)) ):
252  print 'output directory ', m.group(1), 'already exists!'
253  print 'exiting'
254  sys.exit(2)
255  else:
256  if self.options_.negate==False:
257  os.mkdir( os.path.expandvars(m.group(1)) )
258  else:
259  print 'not making dir', self.options_.negate
260 
261  # create a directory
262  def mkdir( self, dirname ):
263  # there is probably a command for this in python
264  mkdir = 'mkdir -p %s' % dirname
265  ret = os.system( mkdir )
266  if( ret != 0 ):
267  print 'please remove or rename directory: ', dirname
268  sys.exit(4)
269 
270 
271  def RunningMode(self, batch):
272  '''Returns "LXPLUS", "PSI", "LOCAL", or None,
273 
274  "LXPLUS" : batch command is bsub, and logged on lxplus
275  "PSI" : batch command is qsub, and logged to t3uiXX
276  "IC" : batch command is qsub, and logged to hep.ph.ic.ac.uk
277  "LOCAL" : batch command is nohup.
278  In all other cases, a CmsBatchException is raised
279  '''
280 
281  hostName = os.environ['HOSTNAME']
282  onLxplus = hostName.startswith('lxplus')
283  onPSI = hostName.startswith('t3ui' )
284  onPISA = re.match('.*gridui.*',hostName) or re.match('.*faiwn.*',hostName)
285  onPADOVA = ( hostName.startswith('t2-ui') and re.match('.*pd.infn.*',hostName) ) or ( hostName.startswith('t2-cld') and re.match('.*lnl.infn.*',hostName) )
286  onIC = 'hep.ph.ic.ac.uk' in hostName
287  batchCmd = batch.split()[0]
288 
289  if batchCmd == 'bsub':
290  if not (onLxplus or onPISA or onPADOVA) :
291  err = 'Cannot run %s on %s' % (batchCmd, hostName)
292  raise ValueError( err )
293  elif onPISA :
294  print 'running on LSF pisa : %s from %s' % (batchCmd, hostName)
295  return 'PISA'
296  elif onPADOVA:
297  print 'running on LSF padova: %s from %s' % (batchCmd, hostName)
298  return 'PADOVA'
299  else:
300  print 'running on LSF lxplus: %s from %s' % (batchCmd, hostName)
301  return 'LXPLUS'
302  elif batchCmd == "qsub":
303  #if not onPSI:
304  # err = 'Cannot run %s on %s' % (batchCmd, hostName)
305  # raise ValueError( err )
306 
307  if onIC:
308  print 'running on IC : %s from %s' % (batchCmd, hostName)
309  return 'IC'
310 
311  else:
312  if onPSI:
313  print 'running on SGE : %s from %s' % (batchCmd, hostName)
314  return 'PSI'
315 
316  elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
317  print 'running locally : %s on %s' % (batchCmd, hostName)
318  return 'LOCAL'
319  else:
320  err = 'unknown batch command: X%sX' % batchCmd
321  raise ValueError( err )
tuple zip
Definition: archive.py:476
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
if(conf.exists("allCellsPositionCalc"))
double split
Definition: MVATrainer.cc:139