CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
batchmanager.BatchManager Class Reference

Public Member Functions

def __init__ (self)
 
def CheckBatchScript (self, batchScript)
 
def DefineOptions (self)
 
def ManageOutputDir (self)
 
def mkdir (self, dirname)
 
def ParseOptions (self)
 
def PrepareJob (self, value, dirname=None)
 
def PrepareJobs (self, listOfValues, listOfDirNames=None)
 
def PrepareJobUser (self, value)
 
def RunningMode (self, batch)
 
def SubmitJob (self, jobDir)
 
def SubmitJobArray (self, numbOfJobs=1)
 
def SubmitJobs (self, waitingTimeInSec=0)
 

Public Attributes

 listOfJobs_
 
 outputDir_
 
 parser_
 
 remoteOutputDir_
 
 remoteOutputFile_
 

Detailed Description

This class manages batch jobs
Used in batch scripts
Colin Bernet 2008

Definition at line 14 of file batchmanager.py.

Constructor & Destructor Documentation

def batchmanager.BatchManager.__init__ (   self)

Definition at line 24 of file batchmanager.py.

References batchmanager.BatchManager.DefineOptions().

24  def __init__(self):
25  self.DefineOptions()
26 
27 

Member Function Documentation

def batchmanager.BatchManager.CheckBatchScript (   self,
  batchScript 
)

Definition at line 204 of file batchmanager.py.

References reco.if().

204  def CheckBatchScript( self, batchScript ):
205 
206  if batchScript == '':
207  return
208 
209  if( os.path.isfile(batchScript)== False ):
210  print 'file ',batchScript,' does not exist'
211  sys.exit(3)
212 
213  try:
214  ifile = open(batchScript)
215  except:
216  print 'cannot open input %s' % batchScript
217  sys.exit(3)
218  else:
219  for line in ifile:
220  p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
221  m=p.match(line)
222  if m:
223  if os.path.isdir( os.path.expandvars(m.group(1)) ):
224  print 'output directory ', m.group(1), 'already exists!'
225  print 'exiting'
226  sys.exit(2)
227  else:
228  if self.options_.negate==False:
229  os.mkdir( os.path.expandvars(m.group(1)) )
230  else:
231  print 'not making dir', self.options_.negate
232 
def CheckBatchScript(self, batchScript)
if(dp >Float(M_PI)) dp-
def batchmanager.BatchManager.DefineOptions (   self)

Definition at line 28 of file batchmanager.py.

Referenced by batchmanager.BatchManager.__init__().

28  def DefineOptions(self):
29  # define options and arguments ====================================
30  # how to add more doc to the help?
31  self.parser_ = OptionParser()
32  self.parser_.add_option("-o", "--output-dir", dest="outputDir",
33  help="Name of the local output directory for your jobs. This directory will be created automatically.",
34  default=None)
35  self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
36  help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
37  default=None)
38  self.parser_.add_option("-f", "--force", action="store_true",
39  dest="force", default=False,
40  help="Don't ask any questions, just over-write")
41  # this opt can be removed
42  self.parser_.add_option("-n", "--negate", action="store_true",
43  dest="negate", default=False,
44  help="create jobs, but does not submit the jobs.")
45  self.parser_.add_option("-b", "--batch", dest="batch",
46  help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
47  default="bsub -q 8nh < ./batchScript.sh")
48  self.parser_.add_option( "--option",
49  dest="extraOptions",
50  type="string",
51  action="append",
52  default=[],
53  help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
54 
def batchmanager.BatchManager.ManageOutputDir (   self)

Definition at line 126 of file batchmanager.py.

126  def ManageOutputDir( self ):
127 
128  #if the output dir is not specified, generate a name
129  #else
130  #test if the directory exists
131  #if yes, returns
132 
133  outputDir = self.options_.outputDir
134 
135  if outputDir==None:
136  today = datetime.today()
137  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
138  print 'output directory not specified, using %s' % outputDir
139 
140  self.outputDir_ = os.path.abspath(outputDir)
141 
142  if( os.path.isdir(self.outputDir_) == True ):
143  input = ''
144  if not self.options_.force:
145  while input != 'y' and input != 'n':
146  input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
147  if input == 'n':
148  sys.exit(1)
149  else:
150  os.system( 'rm -rf ' + self.outputDir_)
151 
152  self.mkdir( self.outputDir_ )
153 
154 
def mkdir(self, dirname)
if(dp >Float(M_PI)) dp-
def batchmanager.BatchManager.mkdir (   self,
  dirname 
)

Definition at line 234 of file batchmanager.py.

References reco.if().

Referenced by batchmanager.BatchManager.PrepareJob().

234  def mkdir( self, dirname ):
235  # there is probably a command for this in python
236  mkdir = 'mkdir -p %s' % dirname
237  ret = os.system( mkdir )
238  if( ret != 0 ):
239  print 'please remove or rename directory: ', dirname
240  sys.exit(4)
241 
242 
def mkdir(self, dirname)
if(dp >Float(M_PI)) dp-
def batchmanager.BatchManager.ParseOptions (   self)

Definition at line 55 of file batchmanager.py.

References reco::parser::ExpressionQuaterOperator< Op >.args_, reco::parser::MethodInvoker.args_, ExternalLHEProducer.args_, cmdline::CmdLine.args_, Json::Path.args_, pftools::CalibCompare.options_, and pftools::Exercises3.options_.

55  def ParseOptions(self):
56  (self.options_,self.args_) = self.parser_.parse_args()
57  if self.options_.remoteCopy == None:
58  self.remoteOutputDir_ = ""
59  else:
60  # removing possible trailing slash
61  self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
62  if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
63  # overwriting protection to be improved
64  if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
65  ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
66  if ld_lib_path != "None":
67  os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path # to solve gfal conflict with CMSSW
68  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
69  outputDir = self.options_.outputDir
70  if outputDir==None:
71  today = datetime.today()
72  outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
73  self.remoteOutputDir_+="/"+outputDir
74  os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
75  if ld_lib_path != "None":
76  os.environ['LD_LIBRARY_PATH'] = ld_lib_path # back to original to avoid conflicts
77  else:
78  print "remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI"
79  print self.remoteOutputDir_, "not valid"
80  sys.exit(1)
81  else: # assume EOS
82  if not castortools.isLFN( self.remoteOutputDir_ ):
83  print 'When providing an output directory, you must give its LFN, starting by /store. You gave:'
84  print self.remoteOutputDir_
85  sys.exit(1)
86  self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
87  dirExist = castortools.isDirectory( self.remoteOutputDir_ )
88  # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
89  # dirExist = os.system( nsls )
90  if dirExist is False:
91  print 'creating ', self.remoteOutputDir_
92  if castortools.isEOSFile( self.remoteOutputDir_ ):
93  # the output directory is currently a file..
94  # need to remove it.
95  castortools.rm( self.remoteOutputDir_ )
96  castortools.createEOSDir( self.remoteOutputDir_ )
97  else:
98  # directory exists.
99  if self.options_.negate is False and self.options_.force is False:
100  #COLIN need to reimplement protectedRemove in eostools
101  raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
102  # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
103  # the user does not want to delete the root files
104 
106  self.ManageOutputDir()
107  return (self.options_, self.args_)
108 
109 
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def batchmanager.BatchManager.PrepareJob (   self,
  value,
  dirname = None 
)
Prepare a job for a given value.

calls PrepareJobUser, which should be overloaded by the user.

Definition at line 155 of file batchmanager.py.

References join(), batchHippy.MyBatchManager.mkdir(), TFileService.mkdir(), TFileDirectory.mkdir(), batchmanager.BatchManager.mkdir(), dqmoffline::l1t::L1TDiffHarvesting::L1TDiffPlotHandler.outputDir_, MillePedeFileExtractor.outputDir_, L1GtVhdlWriter.outputDir_, dqmoffline::l1t::L1TEfficiencyPlotHandler.outputDir_, batchmanager.BatchManager.outputDir_, L1GtVhdlWriterCore.outputDir_, LaserSorter.outputDir_, cmsBatch.MyBatchManager.PrepareJobUser(), and batchmanager.BatchManager.PrepareJobUser().

155  def PrepareJob( self, value, dirname=None):
156  '''Prepare a job for a given value.
157 
158  calls PrepareJobUser, which should be overloaded by the user.
159  '''
160  print 'PrepareJob : %s' % value
161  dname = dirname
162  if dname is None:
163  dname = 'Job_{value}'.format( value=value )
164  jobDir = '/'.join( [self.outputDir_, dname])
165  print '\t',jobDir
166  self.mkdir( jobDir )
167  self.listOfJobs_.append( jobDir )
168  self.PrepareJobUser( jobDir, value )
169 
def mkdir(self, dirname)
def PrepareJob(self, value, dirname=None)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def PrepareJobUser(self, value)
def batchmanager.BatchManager.PrepareJobs (   self,
  listOfValues,
  listOfDirNames = None 
)

Definition at line 110 of file batchmanager.py.

110  def PrepareJobs(self, listOfValues, listOfDirNames=None):
111  print 'PREPARING JOBS ======== '
112  self.listOfJobs_ = []
113 
114  if listOfDirNames is None:
115  for value in listOfValues:
116  self.PrepareJob( value )
117  else:
118  for value, name in zip( listOfValues, listOfDirNames):
119  self.PrepareJob( value, name )
120  print "list of jobs:"
121  pp = pprint.PrettyPrinter(indent=4)
122  pp.pprint( self.listOfJobs_)
123 
124 
def PrepareJob(self, value, dirname=None)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def PrepareJobs(self, listOfValues, listOfDirNames=None)
def batchmanager.BatchManager.PrepareJobUser (   self,
  value 
)
Hook allowing user to define how one of his jobs should be prepared.

Definition at line 170 of file batchmanager.py.

Referenced by batchmanager.BatchManager.PrepareJob().

170  def PrepareJobUser(self, value ):
171  '''Hook allowing user to define how one of his jobs should be prepared.'''
172  print '\to be customized'
173 
174 
def PrepareJobUser(self, value)
def batchmanager.BatchManager.RunningMode (   self,
  batch 
)
Return "LXPUS", "PSI", "NAF", "LOCAL", or None,

"LXPLUS" : batch command is bsub, and logged on lxplus
"PSI"    : batch command is qsub, and logged to t3uiXX
"NAF"    : batch command is qsub, and logged on naf
"IC"     : batch command is qsub, and logged on hep.ph.ic.ac.uk
"LOCAL"  : batch command is nohup.

In all other cases, a CmsBatchException is raised

Definition at line 243 of file batchmanager.py.

Referenced by cmsBatch.MyBatchManager.PrepareJobUser(), and heppy_batch.MyBatchManager.PrepareJobUser().

243  def RunningMode(self, batch):
244 
245  '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
246 
247  "LXPLUS" : batch command is bsub, and logged on lxplus
248  "PSI" : batch command is qsub, and logged to t3uiXX
249  "NAF" : batch command is qsub, and logged on naf
250  "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk
251  "LOCAL" : batch command is nohup.
252 
253  In all other cases, a CmsBatchException is raised
254  '''
255 
256  hostName = os.environ['HOSTNAME']
257 
258  onLxplus = hostName.startswith('lxplus')
259  onPSI = hostName.startswith('t3ui')
260  onNAF = hostName.startswith('naf')
261 
262  batchCmd = batch.split()[0]
263 
264  if batchCmd == 'bsub':
265  if not onLxplus:
266  err = 'Cannot run %s on %s' % (batchCmd, hostName)
267  raise ValueError( err )
268  else:
269  print 'running on LSF : %s from %s' % (batchCmd, hostName)
270  return 'LXPLUS'
271 
272  elif batchCmd == "qsub":
273  if onPSI:
274  print 'running on SGE : %s from %s' % (batchCmd, hostName)
275  return 'PSI'
276  elif onNAF:
277  print 'running on NAF : %s from %s' % (batchCmd, hostName)
278  return 'NAF'
279  elif onIC:
280  print 'running on IC : %s from %s' % (batchCmd, hostName)
281  return 'IC'
282  else:
283  err = 'Cannot run %s on %s' % (batchCmd, hostName)
284  raise ValueError( err )
285 
286  elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
287  print 'running locally : %s on %s' % (batchCmd, hostName)
288  return 'LOCAL'
289  else:
290  err = 'unknown batch command: X%sX' % batchCmd
291  raise ValueError( err )
292 
def RunningMode(self, batch)
def batchmanager.BatchManager.SubmitJob (   self,
  jobDir 
)
Hook for job submission.

Definition at line 194 of file batchmanager.py.

Referenced by batchmanager.BatchManager.SubmitJobs().

194  def SubmitJob( self, jobDir ):
195  '''Hook for job submission.'''
196  print 'submitting (to be customized): ', jobDir
197  os.system( self.options_.batch )
198 
199 
def SubmitJob(self, jobDir)
def batchmanager.BatchManager.SubmitJobArray (   self,
  numbOfJobs = 1 
)
Hook for array job submission.

Definition at line 200 of file batchmanager.py.

200  def SubmitJobArray( self, numbOfJobs = 1 ):
201  '''Hook for array job submission.'''
202  print 'Submitting array with %s jobs' % numbOfJobs
203 
def SubmitJobArray(self, numbOfJobs=1)
def batchmanager.BatchManager.SubmitJobs (   self,
  waitingTimeInSec = 0 
)
Submit all jobs. Possibly wait between each job

Definition at line 175 of file batchmanager.py.

References reco.if(), batchmanager.BatchManager.listOfJobs_, and batchmanager.BatchManager.SubmitJob().

175  def SubmitJobs( self, waitingTimeInSec=0 ):
176  '''Submit all jobs. Possibly wait between each job'''
177 
178  if(self.options_.negate):
179  print '*NOT* SUBMITTING JOBS - exit '
180  return
181  print 'SUBMITTING JOBS ======== '
182  for jobDir in self.listOfJobs_:
183  root = os.getcwd()
184  # run it
185  print 'processing ', jobDir
186  os.chdir( jobDir )
187  self.SubmitJob( jobDir )
188  # and come back
189  os.chdir(root)
190  print 'waiting %s seconds...' % waitingTimeInSec
191  time.sleep( waitingTimeInSec )
192  print 'done.'
193 
if(dp >Float(M_PI)) dp-
def SubmitJobs(self, waitingTimeInSec=0)
def SubmitJob(self, jobDir)

Member Data Documentation

batchmanager.BatchManager.listOfJobs_

Definition at line 112 of file batchmanager.py.

Referenced by batchmanager.BatchManager.SubmitJobs().

batchmanager.BatchManager.outputDir_
batchmanager.BatchManager.parser_

Definition at line 31 of file batchmanager.py.

batchmanager.BatchManager.remoteOutputDir_

Definition at line 58 of file batchmanager.py.

batchmanager.BatchManager.remoteOutputFile_

Definition at line 105 of file batchmanager.py.