CMS 3D CMS Logo

production_tasks.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 
4 from builtins import range
5 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
6 import glob
7 import gzip
8 import errno
9 from .edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
10 from .addToDatasets import addToDatasets
11 
12 from . import eostools as castortools
13 from . import das as Das
14 
15 from .dataset import Dataset
16 from .datasetToSource import createDataset
17 from .castorBaseDir import castorBaseDir
18 import six
19 
20 def mkdir_p(path):
21  try:
22  os.makedirs(path)
23  except OSError as exc: # Python >2.5
24  if exc.errno == errno.EEXIST:
25  pass
26  else: raise
27 
28 class Task(object):
29  """Base class for Task API"""
30  def __init__(self, name, dataset, user, options, instance = None):
31  self.name = name
32  self.instance = instance
33  self.dataset = dataset
34  self.user = user
35  self.options = options
36  def getname(self):
37  """The name of the object, using the instance if needed"""
38  if self.instance is not None:
39  return '%s_%s' % (self.name,self.instance)
40  else:
41  return self.name
42  def addOption(self, parser):
43  """A hook for adding things to the parser"""
44  pass
45  def run(self, input):
46  """Basic API for a task. input and output are dictionaries"""
47  return {}
48 
50  """Common options for the script __main__: used by all production tasks"""
51  def __init__(self, dataset, user, options):
52  Task.__init__(self,'ParseOptions', dataset, user, options)
53 
54  usage = """%prog [options] <dataset>
55 
56 The %prog script aims to take a list of samples and process them on the batch system. Submission
57 may be done serially (by setting --max_threads to 1), or in parallel (the default).
58 
59 The basic flow is:
60 
61  1) Check that the sample to run on exists
62  2) Generate a source CFG
63  3) Run locally and check everything works with a small number of events
64  4) Submit to the batch system
65  5) Wait until the jobs are finished
66  6) Check the jobs ran OK and that the files are good
67 
68 Example:
69 
70 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
71 
72 It is often useful to store the sample names in a file, in which case you could instead do:
73 
74 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
75 
76 An example file might contain:
77 
78 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
79 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
80 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
81 
82 The CASTOR username for each sample is given before the '%'.
83 
84 Each step in the flow has a task associated with it, which may set options. The options for each task are
85 documented below.
86 
87 """
88  self.das = Das.DASOptionParser(usage=usage)
89  def addOption(self, parser):
90  parser.add_option("-u", "--user", dest="user", default=os.getlogin(),help='The username to use when looking at mass storage devices. Your login username is used by default.')
91  parser.add_option("-w", "--wildcard", dest="wildcard", default='*.root',help='A UNIX style wildcard to specify which input files to check before submitting the jobs')
92  parser.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use in the production')
93  def run(self, input):
94  self.options, self.dataset = self.das.get_opt()
95  self.dataset = [d for d in self.dataset if not d.startswith('#')]
96  self.user = self.options.user
97  if not self.dataset:
98  raise Exception('TaskError: No dataset specified')
99  return {'Options':self.options, 'Dataset':self.dataset}
100 
102  """Use 'datasets.py' to check that the dataset exists in the production system.
103  """
104  def __init__(self, dataset, user, options):
105  Task.__init__(self,'CheckDatasetExists', dataset, user, options)
106  def run(self, input):
107  pattern = fnmatch.translate(self.options.wildcard)
108  run_range = (self.options.min_run, self.options.max_run)
109  data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
110  if( len(data.listOfGoodFiles()) == 0 ):
111  raise Exception('no good root file in dataset %s | %s | %s | %s' % (self.user,
112  self.dataset,
113  self.options.wildcard,
114  run_range))
115  return {'Dataset':self.dataset}
116 
118  """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
119  def __init__(self, dataset, user, options):
120  Task.__init__(self,'BaseDataset', dataset, user, options)
121  def addOption(self, parser):
122  parser.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
123  def query(self, dataset):
124  """Query DAS to find out how many events are in the dataset"""
125 
126  host = self.options.host
127  debug = self.options.verbose
128  idx = self.options.idx
129  limit = self.options.limit
130 
131  def check(ds):
132  query = 'dataset=%s' % ds
133  result = Das.get_data(host, query, idx, limit, debug)
134  result = result.replace('null','None')
135  result = result.replace('true','True')
136  result = result.replace('false','False')
137  data = eval(result)
138  if data['status'] != 'ok':
139  raise Exception("Das query failed: Output is '%s'" % data)
140  return (data['data'],data)
141 
142  data = None
143  exists = False
144 
145  if self.options.name is None:
146  #guess the dataset name in DBS
147  tokens = [t for t in dataset.split(os.sep) if t]
148  if len(tokens) >= 3:
149  #DBS names always have three entries
150  ds = os.sep + os.sep.join(tokens[0:3])
151  if ds:
152  exists, data = check(ds)
153  self.options.name = ds
154  else:
155  exists, data = check(self.options.name)
156  if not exists:
157  raise Exception("Specified dataset '%s' not found in Das. Please check." % self.options.name)
158 
159  if data is None:
160  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
161  return data
162 
163  def run(self, input):
164  output = {}
165  if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
166  output = self.query(self.dataset)
167  return {'Name':self.options.name,'Das':output}
168 
170  """GZip a list of files"""
171  def __init__(self, dataset, user, options):
172  Task.__init__(self,'GZipFiles', dataset, user, options)
173  def gzip(self, fileName):
174  output = '%s.gz' % fileName
175 
176  f_in = open(fileName, 'rb')
177  f_out = gzip.open(output, 'wb')
178  f_out.writelines(f_in)
179  f_out.close()
180  f_in.close()
181  #remove the original file once we've gzipped it
182  os.remove(fileName)
183  return output
184 
185  def run(self, input):
186  files = input['FilesToCompress']['Files']
187 
188  compressed = []
189  for f in files:
190  if f is None or not f: continue
191  if os.path.exists(f):
192  gz = self.gzip(f)
193  compressed.append(gz)
194  return {'CompressedFiles':compressed}
195 
197  """Remove a list of files"""
198  def __init__(self, dataset, user, options):
199  Task.__init__(self,'CleanFiles', dataset, user, options)
200  def run(self, input):
201  files = input['FilesToClean']['Files']
202  removed = []
203  for f in files:
204  if f is None or not f: continue
205  if os.path.exists(f): os.remove(f)
206  removed.append(f)
207  return {'CleanedFiles':removed}
208 
210  """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
211  def __init__(self, dataset, user, options):
212  Task.__init__(self,'FindOnCastor', dataset, user, options)
213  def run(self, input):
214  if self.user == 'CMS':
215  return {'Topdir':None,'Directory':None}
216  topdir = castortools.lfnToCastor(castorBaseDir(user=self.user))
217  directory = '%s/%s' % (topdir,self.dataset)
218  # directory = directory.replace('//','/')
219  if not castortools.fileExists(directory):
220  if hasattr(self,'create') and self.create:
221  castortools.createCastorDir(directory)
222  #castortools.chmod(directory,'775')
223  if not castortools.isDirectory(directory):
224  raise Exception("Dataset directory '%s' does not exist or could not be created" % directory)
225  return {'Topdir':topdir,'Directory':directory}
226 
228  """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
229  def __init__(self, dataset, user, options):
230  Task.__init__(self,'CheckForMask', dataset, user, options)
231  def addOption(self, parser):
232  parser.add_option("-c", "--check", dest="check", default=False, action='store_true',help='Check filemask if available')
233  def run(self, input):
234  #skip for DBS
235  if self.user == 'CMS':
236  return {'MaskPresent':True,'Report':'Files taken from DBS'}
237 
238  dir = input['FindOnCastor']['Directory']
239  mask = "IntegrityCheck"
240  file_mask = []
241 
242  report = None
243  if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
244  file_mask = castortools.matchingFiles(dir, '^%s_.*\.txt$' % mask)
245 
246  if file_mask:
247  p = PublishToFileSystem(mask)
248  report = p.get(dir)
249  return {'MaskPresent':report is not None,'Report':report}
250 
252  """Checks whether you have write access to the CASTOR directory specified"""
253  def __init__(self, dataset, user, options):
254  Task.__init__(self,'CheckForWrite', dataset, user, options)
255  def run(self, input):
256  """Check that the directory is writable"""
257  if self.user == 'CMS':
258  return {'Directory':None,'WriteAccess':True}
259  dir = input['FindOnCastor']['Directory']
260  if self.options.check:
261 
262  _, name = tempfile.mkstemp('.txt',text=True)
263  testFile = file(name,'w')
264  testFile.write('Test file')
265  testFile.close()
266 
267  store = castortools.castorToLFN(dir)
268  #this is bad, but castortools is giving me problems
269  if not os.system('cmsStage %s %s' % (name,store)):
270  fname = '%s/%s' % (dir,os.path.basename(name))
271  write = castortools.fileExists(fname)
272  if write:
273  castortools.rm(fname)
274  else:
275  raise Exception("Failed to write to directory '%s'" % dir)
276  os.remove(name)
277  return {'Directory':dir,'WriteAccess':True}
278 
280  """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
281  def __init__(self, dataset, user, options):
282  Task.__init__(self,'GenerateMask', dataset, user, options)
283  def addOption(self, parser):
284  parser.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
285  parser.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')
286  def run(self, input):
287 
288  report = None
289  if self.options.check and not input['CheckForMask']['MaskPresent']:
290 
291  options = copy.deepcopy(self.options)
292  options.user = self.user
293 
294  if 'BaseDataset' in input:
295  options.name = input['BaseDataset']['Name']
296  else:
297  options.name = None
298 
299  check = IntegrityCheck(self.dataset,options)
300  check.test()
301  report = check.structured()
302  pub = PublishToFileSystem(check)
303  pub.publish(report)
304  elif input['CheckForMask']['MaskPresent']:
305  report = input['CheckForMask']['Report']
306 
307  return {'MaskPresent':report is not None,'Report':report}
308 
310  """Generates a job directory on your local drive"""
311  def __init__(self, dataset, user, options):
312  Task.__init__(self,'CreateJobDirectory', dataset, user, options)
313  def addOption(self, parser):
314  parser.add_option("-o","--output", dest="output", default=None,help='The directory to use locally for job files')
315  def run(self, input):
316  if self.options.output is not None:
317  output = self.options.output
318  else:
319  # output = '%s_%s' % (self.dataset.replace('/','.'),datetime.datetime.now().strftime("%s"))
320  # if output.startswith('.'):
321  output = '%s_%s' % (self.dataset,datetime.datetime.now().strftime("%s"))
322  output = output.lstrip('/')
323  if not os.path.exists(output):
324  mkdir_p(output)
325  return {'JobDir':output,'PWD':os.getcwd()}
326 
328  """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
329  def __init__(self, dataset, user, options):
330  Task.__init__(self,'SourceCFG', dataset, user, options)
331  def addOption(self, parser):
332  parser.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
333  parser.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
334  parser.add_option("--input-prescale", dest="prescale", default=1, type=int, help='Randomly prescale the number of good files by this factor.')
335  def run(self, input):
336 
337  jobdir = input['CreateJobDirectory']['JobDir']
338  pattern = fnmatch.translate(self.options.wildcard)
339 
340  run_range = (self.options.min_run, self.options.max_run)
341  data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
342  good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
343  #will mark prescale removed files as bad in comments
344  bad_files = [fname for fname in data.listOfFiles() if not fname in good_files]
345 
346  source = os.path.join(jobdir,'source_cfg.py')
347  output = file(source,'w')
348  output.write('###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
349  output.write('files = ' + str(good_files) + '\n')
350  for bad_file in bad_files:
351  output.write("###SourceCFG:\tBadInMask '%s'\n" % bad_file)
352  output.close()
353  return {'SourceCFG':source}
354 
355 
356 def insertLines( insertedTo, toInsert ):
357  '''insert a sequence in another sequence.
358 
359  the sequence is inserted either at the end, or at the position
360  of the HOOK, if it is found.
361  The HOOK is considered as being found if
362  str(elem).find(###ProductionTaskHook$$$)
363  is true for one of the elements in the insertedTo sequence.
364  '''
365  HOOK = '###ProductionTaskHook$$$'
366  hookIndex = None
367  for index, line in enumerate(insertedTo):
368  line = str(line)
369  if line.find(HOOK)>-1:
370  hookIndex = index
371  break
372  if hookIndex is not None:
373  before = insertedTo[:hookIndex]
374  after = insertedTo[hookIndex:]
375  result = before + toInsert + after
376  return result
377  else:
378  insertedTo.extend( toInsert )
379  return insertedTo
380 
381 
382 class FullCFG(Task):
383  """Generate the full CFG needed to run the job and writes it to the job directory"""
384  def __init__(self, dataset, user, options):
385  Task.__init__(self,'FullCFG', dataset, user, options)
386  def addOption(self, parser):
387  parser.add_option("--cfg", dest="cfg", default=None, help='The top level CFG to run')
388  parser.add_option("--nEventsPerJob", dest="nEventsPerJob", default=None, help='Number of events per job (for testing)')
389  def run(self, input):
390 
391  jobdir = input['CreateJobDirectory']['JobDir']
392 
393  if self.options.cfg is None or not os.path.exists(self.options.cfg):
394  raise Exception("The file '%s' does not exist. Please check." % self.options.cfg)
395 
396  config = file(self.options.cfg).readlines()
397  sourceFile = os.path.basename(input['SourceCFG']['SourceCFG'])
398  if sourceFile.lower().endswith('.py'):
399  sourceFile = sourceFile[:-3]
400 
401  source = os.path.join(jobdir,'full_cfg.py')
402  output = file(source,'w')
403 
404  nEventsPerJob = -1
405  if self.options.nEventsPerJob:
406  nEventsPerJob = int(self.options.nEventsPerJob)
407 
408  toInsert = ['\nfrom %s import *\n' % sourceFile,
409  'process.source.fileNames = files\n',
410  'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.format(nEvents=nEventsPerJob),
411  'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
412  'datasetInfo = ("%s","%s","%s")\n' % (self.user, self.dataset, fnmatch.translate(self.options.wildcard) )
413  ]
414  config = insertLines( config, toInsert )
415  output.writelines(config)
416  output.close()
417  return {'FullCFG':source}
418 
420  """Check the basic syntax of a CFG file by running python on it."""
421  def __init__(self, dataset, user, options):
422  Task.__init__(self,'CheckConfig', dataset, user, options)
423  def run(self, input):
424 
425  full = input['FullCFG']['FullCFG']
426 
427  child = subprocess.Popen(['python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
428  stdout, stderr = child.communicate()
429  if child.returncode != 0:
430  raise Exception("Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
431  return {'Status':'VALID'}
432 
434  """Run cmsRun but with a small number of events on the job CFG."""
435 
436  def __init__(self, dataset, user, options):
437  Task.__init__(self,'RunTestEvents', dataset, user, options)
438  def run(self, input):
439 
440  full = input['FullCFG']['FullCFG']
441  jobdir = input['CreateJobDirectory']['JobDir']
442 
443  config = file(full).readlines()
444  source = os.path.join(jobdir,'test_cfg.py')
445  output = file(source,'w')
446  toInsert = ['\n',
447  'process.maxEvents.input = cms.untracked.int32(5)\n',
448  'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
449  ]
450  config = insertLines( config, toInsert )
451  output.writelines(config)
452  output.close()
453 
454  pwd = os.getcwd()
455 
456  error = None
457  try:
458  os.chdir(jobdir)
459 
460  child = subprocess.Popen(['cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
461  stdout, stderr = child.communicate()
462 
463  if child.returncode != 0:
464  error = "Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
465  finally:
466  os.chdir(pwd)
467 
468  if error is not None:
469  raise Exception(error)
470 
471  return {'Status':'VALID','TestCFG':source}
472 
474  """Runs edmConfigDump to produce an expanded cfg file"""
475 
476  def __init__(self, dataset, user, options):
477  Task.__init__(self,'ExpandConfig', dataset, user, options)
478  def run(self, input):
479 
480  full = input['FullCFG']['FullCFG']
481  jobdir = input['CreateJobDirectory']['JobDir']
482 
483  config = file(full).read()
484  source = os.path.join(jobdir,'test_cfg.py')
485  expanded = 'Expanded%s' % os.path.basename(full)
486  output = file(source,'w')
487  output.write(config)
488  output.write("file('%s','w').write(process.dumpPython())\n" % expanded)
489  output.close()
490 
491  pwd = os.getcwd()
492 
493  result = {}
494  error = None
495  try:
496  os.chdir(jobdir)
497 
498  child = subprocess.Popen(['python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
499  stdout, stderr = child.communicate()
500 
501  if child.returncode != 0:
502  error = "Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
503  result['ExpandedFullCFG'] = os.path.join(jobdir,expanded)
504 
505  finally:
506  os.chdir(pwd)
507 
508  if error is not None:
509  raise Exception(error)
510 
511  return result
512 
514  """Publish the sample to 'Datasets.txt' if required"""
515  def __init__(self, dataset, user, options):
516  Task.__init__(self,'WriteToDatasets', dataset, user, options)
517  def run(self, input):
518  name = "%s/%s" % (self.dataset,self.options.tier)
519  name = name.replace('//','/')
520  user = self.options.batch_user
521  added = addToDatasets(name, user = user)
522  return {'Added':added, 'Name':name, 'User':user}
523 
525  """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""
526 
527  def __init__(self, dataset, user, options):
528  Task.__init__(self,'RunCMSBatch', dataset, user, options)
529  def addOption(self, parser):
530  parser.add_option("--batch_user", dest="batch_user", help="The user for LSF", default=os.getlogin())
531  parser.add_option("--run_batch", dest="run_batch", default=True, action='store_true',help='Run on the batch system')
532  parser.add_option("-N", "--numberOfInputFiles", dest="nInput",help="Number of input files per job",default=5,type=int)
533  parser.add_option("-q", "--queue", dest="queue", help="The LSF queue to use", default="1nh")
534  parser.add_option("-t", "--tier", dest="tier",
535  help="Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
536  default="")
537  parser.add_option("-G", "--group", dest="group", help="The LSF user group to use, e.g. 'u_zh'", default=None)
538 
539  def run(self, input):
540  find = FindOnCastor(self.dataset,self.options.batch_user,self.options)
541  find.create = True
542  out = find.run({})
543 
544  full = input['ExpandConfig']['ExpandedFullCFG']
545  jobdir = input['CreateJobDirectory']['JobDir']
546 
547  sampleDir = os.path.join(out['Directory'],self.options.tier)
548  sampleDir = castortools.castorToLFN(sampleDir)
549 
550  cmd = ['cmsBatch.py',str(self.options.nInput),os.path.basename(full),'-o','%s_Jobs' % self.options.tier,'--force']
551  cmd.extend(['-r',sampleDir])
552  if self.options.run_batch:
553  jname = "%s/%s" % (self.dataset,self.options.tier)
554  jname = jname.replace("//","/")
555  user_group = ''
556  if self.options.group is not None:
557  user_group = '-G %s' % self.options.group
558  cmd.extend(['-b',"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
559  print(" ".join(cmd))
560 
561  pwd = os.getcwd()
562 
563  error = None
564  try:
565  os.chdir(jobdir)
566  returncode = os.system(" ".join(cmd))
567 
568  if returncode != 0:
569  error = "Running cmsBatch failed. Return code was %i." % returncode
570  finally:
571  os.chdir(pwd)
572 
573  if error is not None:
574  raise Exception(error)
575 
576  return {'SampleDataset':"%s/%s" % (self.dataset,self.options.tier),'BatchUser':self.options.batch_user,
577  'SampleOutputDir':sampleDir,'LSFJobsTopDir':os.path.join(jobdir,'%s_Jobs' % self.options.tier)}
578 
580  """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""
581  def __init__(self, dataset, user, options):
582  Task.__init__(self,'MonitorJobs', dataset, user, options)
583 
584  def getjobid(self, job_dir):
585  """Parse the LSF output to find the job id"""
586  input = os.path.join(job_dir,'job_id.txt')
587  result = None
588  if os.path.exists(input):
589  contents = file(input).read()
590  for c in contents.split('\n'):
591  if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
592  try:
593  result = c.split('<')[1].split('>')[0]
594  except Exception as e:
595  print('Job ID parsing error',str(e),c, file=sys.stderr)
596  return result
597 
598  def monitor(self, jobs, previous):
599 
600  #executes bjobs with a list of job IDs
601  cmd = ['bjobs','-u',self.options.batch_user]
602  cmd.extend([v for v in jobs.values() if v is not None])#filter out unknown IDs
603  child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
604  stdout, stderr = child.communicate()
605 
606  def parseHeader(header):
607  """Parse the header from bjobs"""
608  tokens = [t for t in header.split(' ') if t]
609  result = {}
610  for i in range(len(tokens)):
611  result[tokens[i]] = i
612 
613  return result
614 
615  result = {}
616  if stdout:
617  lines = stdout.split('\n')
618  if lines:
619  header = parseHeader(lines[0])
620  if not 'STAT' in header or not 'JOBID' in header:
621  print('Problem parsing bjobs header\n',lines, file=sys.stderr)
622  return result
623  for line in lines[1:]:
624  #TODO: Unreliable for some fields, e.g. dates
625  tokens = [t for t in line.split(' ') if t]
626  if len(tokens) < len(header): continue
627  id = tokens[header['JOBID']]
628  user = tokens[header['USER']]
629  status = tokens[header['STAT']]
630 
631  result[id] = status
632 
633  if stderr:
634  lines = stderr.split('\n')
635  if lines:
636  for line in lines:
637  if line and re.match('^Job <\\d*> is not found',line) is not None:
638  try:
639  id = line.split('<')[1].split('>')[0]
640  if id not in result and id not in previous:
641  result[id] = 'FORGOTTEN'
642  except Exception as e:
643  print('Job ID parsing error in STDERR',str(e),line, file=sys.stderr)
644 
645  #after one hour the status is no longer available
646  if result:
647  for id in jobs.values():
648  if id not in result and id in previous:
649  result[id] = previous[id]
650  return result
651 
652  def run(self, input):
653 
654  # return #COLIN
655  jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
656  if not os.path.exists(jobsdir):
657  raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
658 
659  subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
660  jobs = {}
661  for s in subjobs:
662  jobs[s] = self.getjobid(s)
663 
664  def checkStatus(stat):
665 
666  #gzip files on the fly
667  actions = {'FilesToCompress':{'Files':[]}}
668 
669  result = {}
670  for j, id in six.iteritems(jobs):
671  if id is None:
672  result[j] = 'UNKNOWN'
673  else:
674  if id in stat:
675  result[j] = stat[id]
676  if result[j] in ['DONE','EXIT','FORGOTTEN']:
677  stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
678  if os.path.exists(stdout):
679  #compress this file
680  actions['FilesToCompress']['Files'].append(stdout)
681  result[j] = '%s.gz' % stdout
682  elif os.path.exists('%s.gz' % stdout):
683  result[j] = '%s.gz' % stdout
684  else:
685  result[j] = 'NOSTDOUT'
686 
687  #also compress the stderr, although this is mostly empty
688  stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
689  if os.path.exists(stderr):
690  #compress this file
691  actions['FilesToCompress']['Files'].append(stderr)
692 
693  compress = GZipFiles(self.dataset,self.user,self.options)
694  compress.run(actions)
695  return result
696 
697  def countJobs(stat):
698  """Count jobs that are monitorable - i.e. not in a final state"""
699  result = []
700  for j, id in six.iteritems(jobs):
701  if id is not None and id in stat:
702  st = stat[id]
703  if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
704  result.append(id)
705  return result
706 
707  def writeKillScript(mon):
708  """Write a shell script to kill the jobs we know about"""
709  kill = os.path.join(jobsdir,'kill_jobs.sh')
710  output = file(kill,'w')
711  script = """
712 #!/usr/bin/env bash
713 echo "Killing jobs"
714 bkill -u %s %s
715  """ % (self.options.batch_user," ".join(mon))
716  output.write(script)
717  output.close()
718  return mon
719 
720  #continue monitoring while there are jobs to monitor
721  status = self.monitor(jobs,{})
722  monitorable = writeKillScript(countJobs(status))
723  count = 0
724 
725  while monitorable:
726  job_status = checkStatus(status)
727  time.sleep(60)
728  status = self.monitor(jobs,status)
729  monitorable = writeKillScript(countJobs(status))
730  if not (count % 3):
731  print('%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset))
732  count += 1
733 
734  return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}
735 
737  """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""
738  def __init__(self, dataset, user, options):
739  Task.__init__(self,'CheckJobStatus', dataset, user, options)
740  def addOption(self, parser):
741  parser.add_option("--output_wildcard", dest="output_wildcard", help="The wildcard to use when testing the output of this production (defaults to same as -w)", default=None)
742  def run(self, input):
743 
744  job_status = input['MonitorJobs']['LSFJobStatus']
745 
746  result = {}
747  for j, status in six.iteritems(job_status):
748  valid = True
749  if os.path.exists(status):
750 
751  fileHandle = None
752  if status.endswith('.gz') or status.endswith('.GZ'):
753  fileHandle = gzip.GzipFile(status)
754  else:
755  fileHandle = file(status)
756 
757  open_count = 0
758  close_count = 0
759  for line in fileHandle:
760  #start by counting files opened and closed
761  #suggestion from Enrique
762  if 'pened file' in line:
763  open_count += 1
764  if 'losed file' in line:
765  close_count += 1
766 
767  if 'Exception' in line:
768  result[j] = 'Exception'
769  valid = False
770  break
771  elif 'CPU time limit exceeded' in line:
772  result[j] = 'CPUTimeExceeded'
773  valid = False
774  break
775  elif 'Killed' in line:
776  result[j] = 'JobKilled'
777  valid = False
778  break
779  elif 'A fatal system signal has occurred' in line:
780  result[j] = 'SegFault'
781  valid = False
782  break
783 
784  if valid and open_count != close_count:
785  result[j] = 'FileOpenCloseMismatch'
786  valid = False
787  if valid:
788  result[j] = 'VALID'
789  else:
790  result[j] = status
791 
792  #allows a different wildcard in the final check.
793  options = copy.deepcopy(self.options)
794  if self.options.output_wildcard is not None:
795  options.wildcard = self.options.output_wildcard
796 
797  mask = GenerateMask(input['RunCMSBatch']['SampleDataset'],self.options.batch_user,options)
798  report = mask.run({'CheckForMask':{'MaskPresent':False}})
799  report['LSFJobStatusCheck'] = result
800  return report
801 
803  """Write a summary report on each job"""
804  def __init__(self, dataset, user, options):
805  Task.__init__(self,'WriteJobReport', dataset, user, options)
806  def run(self, input):
807 
808  report = input['CheckJobStatus']
809 
810  #collect a list of jobs by status
811  states = {}
812  for j, status in six.iteritems(report['LSFJobStatusCheck']):
813  if status not in states:
814  states[status] = []
815  states[status].append(j)
816  jobdir = input['CreateJobDirectory']['PWD']
817  if not os.path.exists(jobdir):
818  raise Exception("Top level job directory not found: '%s'" % jobdir)
819  report_file = os.path.join(input['CreateJobDirectory']['JobDir'],'resubmit.sh')
820 
821  output = file(report_file,'w')
822  output.write('#!/usr/bin/env bash\n')
823 
824  if report['MaskPresent']:
825  mask = report['Report']
826  output.write('#PrimaryDatasetFraction: %f\n' % mask['PrimaryDatasetFraction'])
827  output.write('#FilesGood: %i\n' % mask['FilesGood'])
828  output.write('#FilesBad: %i\n' % mask['FilesBad'])
829 
830  user_group = ''
831  if self.options.group is not None:
832  user_group = '-G %s' % self.options.group
833 
834  for status, jobs in six.iteritems(states):
835  output.write('# %d jobs found in state %s\n' % (len(jobs),status) )
836  if status == 'VALID':
837  continue
838  for j in jobs:
839  jdir = os.path.join(jobdir,j)
840  output.write('pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
841  output.close()
842 
843  return {'SummaryFile':report_file}
844 
846  """Removes and compresses auto-generated files from the job directory to save space."""
847  def __init__(self, dataset, user, options):
848  Task.__init__(self,'CleanJobFiles', dataset, user, options)
849  def run(self, input):
850 
851  jobdir = input['CreateJobDirectory']['JobDir']
852  jobs = input['MonitorJobs']['LSFJobIDs']
853  job_status = input['MonitorJobs']['LSFJobStatus']
854 
855  actions = {'FilesToCompress':{'Files':[]},'FilesToClean':{'Files':[]}}
856 
857  actions['FilesToClean']['Files'].append(input['ExpandConfig']['ExpandedFullCFG'])
858  if 'RunTestEvents' in input:
859  actions['FilesToClean']['Files'].append(input['RunTestEvents']['TestCFG'])
860 
861  for rt in glob.iglob('%s/*.root' % jobdir):
862  actions['FilesToClean']['Files'].append(rt)
863  for pyc in glob.iglob('%s/*.pyc' % jobdir):
864  actions['FilesToClean']['Files'].append(pyc)
865 
866  for j in jobs:
867  status = job_status[j]
868  if os.path.exists(status) and not status.endswith('.gz'):
869  actions['FilesToCompress']['Files'].append(status)
870 
871  compress = GZipFiles(self.dataset,self.user,self.options)
872  compressed = compress.run(actions)
873 
874  clean = CleanFiles(self.dataset,self.user,self.options)
875  removed = clean.run(actions)
876  return {'Cleaned':removed,'Compressed':compressed}
877 
def insertLines(insertedTo, toInsert)
def __init__(self, dataset, user, options)
def addOption(self, parser)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def addOption(self, parser)
def __init__(self, dataset, user, options)
def gzip(self, fileName)
static std::map< std::string, std::string > parseHeader(const std::vector< std::string > &header)
def __init__(self, dataset, user, options)
def createDataset(user, dataset, pattern, readcache=False, basedir=None, run_range=None)
Definition: dataset.py:430
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, name, dataset, user, options, instance=None)
def __init__(self, dataset, user, options)
def monitor(self, jobs, previous)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def run(self, input)
def check(config)
Definition: trackerTree.py:14
#define str(s)
def __init__(self, dataset, user, options)
double split
Definition: MVATrainer.cc:139