CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
production_tasks.py
Go to the documentation of this file.
1 
2 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
3 import glob
4 import gzip
5 import errno
6 from edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
7 from addToDatasets import addToDatasets
8 
9 import eostools as castortools
10 import das as Das
11 
12 from dataset import Dataset
13 from datasetToSource import createDataset
14 from castorBaseDir import castorBaseDir
15 
16 def mkdir_p(path):
17  try:
18  os.makedirs(path)
19  except OSError as exc: # Python >2.5
20  if exc.errno == errno.EEXIST:
21  pass
22  else: raise
23 
24 class Task(object):
25  """Base class for Task API"""
26  def __init__(self, name, dataset, user, options, instance = None):
27  self.name = name
28  self.instance = instance
29  self.dataset = dataset
30  self.user = user
31  self.options = options
32  def getname(self):
33  """The name of the object, using the instance if needed"""
34  if self.instance is not None:
35  return '%s_%s' % (self.name,self.instance)
36  else:
37  return self.name
38  def addOption(self, parser):
39  """A hook for adding things to the parser"""
40  pass
41  def run(self, input):
42  """Basic API for a task. input and output are dictionaries"""
43  return {}
44 
46  """Common options for the script __main__: used by all production tasks"""
47  def __init__(self, dataset, user, options):
48  Task.__init__(self,'ParseOptions', dataset, user, options)
49 
50  usage = """%prog [options] <dataset>
51 
52 The %prog script aims to take a list of samples and process them on the batch system. Submission
53 may be done serially (by setting --max_threads to 1), or in parallel (the default).
54 
55 The basic flow is:
56 
57  1) Check that the sample to run on exists
58  2) Generate a source CFG
59  3) Run locally and check everything works with a small number of events
60  4) Submit to the batch system
61  5) Wait until the jobs are finished
62  6) Check the jobs ran OK and that the files are good
63 
64 Example:
65 
66 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
67 
68 It is often useful to store the sample names in a file, in which case you could instead do:
69 
70 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
71 
72 An example file might contain:
73 
74 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
75 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
76 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
77 
78 The CASTOR username for each sample is given before the '%'.
79 
80 Each step in the flow has a task associated with it, which may set options. The options for each task are
81 documented below.
82 
83 """
84  self.das = Das.DASOptionParser(usage=usage)
85  def addOption(self, parser):
86  parser.add_option("-u", "--user", dest="user", default=os.getlogin(),help='The username to use when looking at mass storage devices. Your login username is used by default.')
87  parser.add_option("-w", "--wildcard", dest="wildcard", default='*.root',help='A UNIX style wildcard to specify which input files to check before submitting the jobs')
88  parser.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use in the production')
89  def run(self, input):
90  self.options, self.dataset = self.das.get_opt()
91  self.dataset = [d for d in self.dataset if not d.startswith('#')]
92  self.user = self.options.user
93  if not self.dataset:
94  raise Exception('TaskError: No dataset specified')
95  return {'Options':self.options, 'Dataset':self.dataset}
96 
98  """Use 'datasets.py' to check that the dataset exists in the production system.
99  """
100  def __init__(self, dataset, user, options):
101  Task.__init__(self,'CheckDatasetExists', dataset, user, options)
102  def run(self, input):
103  pattern = fnmatch.translate(self.options.wildcard)
104  run_range = (self.options.min_run, self.options.max_run)
105  data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
106  if( len(data.listOfGoodFiles()) == 0 ):
107  raise Exception('no good root file in dataset %s | %s | %s | %s' % (self.user,
108  self.dataset,
109  self.options.wildcard,
110  run_range))
111  return {'Dataset':self.dataset}
112 
114  """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
115  def __init__(self, dataset, user, options):
116  Task.__init__(self,'BaseDataset', dataset, user, options)
117  def addOption(self, parser):
118  parser.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
119  def query(self, dataset):
120  """Query DAS to find out how many events are in the dataset"""
121 
122  host = self.options.host
123  debug = self.options.verbose
124  idx = self.options.idx
125  limit = self.options.limit
126 
127  def check(ds):
128  query = 'dataset=%s' % ds
129  result = Das.get_data(host, query, idx, limit, debug)
130  result = result.replace('null','None')
131  result = result.replace('true','True')
132  result = result.replace('false','False')
133  data = eval(result)
134  if data['status'] != 'ok':
135  raise Exception("Das query failed: Output is '%s'" % data)
136  return (data['data'],data)
137 
138  data = None
139  exists = False
140 
141  if self.options.name is None:
142  #guess the dataset name in DBS
143  tokens = [t for t in dataset.split(os.sep) if t]
144  if len(tokens) >= 3:
145  #DBS names always have three entries
146  ds = os.sep + os.sep.join(tokens[0:3])
147  if ds:
148  exists, data = check(ds)
149  self.options.name = ds
150  else:
151  exists, data = check(self.options.name)
152  if not exists:
153  raise Exception("Specified dataset '%s' not found in Das. Please check." % self.options.name)
154 
155  if data is None:
156  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
157  return data
158 
159  def run(self, input):
160  output = {}
161  if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
162  output = self.query(self.dataset)
163  return {'Name':self.options.name,'Das':output}
164 
166  """GZip a list of files"""
167  def __init__(self, dataset, user, options):
168  Task.__init__(self,'GZipFiles', dataset, user, options)
169  def gzip(self, fileName):
170  output = '%s.gz' % fileName
171 
172  f_in = open(fileName, 'rb')
173  f_out = gzip.open(output, 'wb')
174  f_out.writelines(f_in)
175  f_out.close()
176  f_in.close()
177  #remove the original file once we've gzipped it
178  os.remove(fileName)
179  return output
180 
181  def run(self, input):
182  files = input['FilesToCompress']['Files']
183 
184  compressed = []
185  for f in files:
186  if f is None or not f: continue
187  if os.path.exists(f):
188  gz = self.gzip(f)
189  compressed.append(gz)
190  return {'CompressedFiles':compressed}
191 
193  """Remove a list of files"""
194  def __init__(self, dataset, user, options):
195  Task.__init__(self,'CleanFiles', dataset, user, options)
196  def run(self, input):
197  files = input['FilesToClean']['Files']
198  removed = []
199  for f in files:
200  if f is None or not f: continue
201  if os.path.exists(f): os.remove(f)
202  removed.append(f)
203  return {'CleanedFiles':removed}
204 
206  """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
207  def __init__(self, dataset, user, options):
208  Task.__init__(self,'FindOnCastor', dataset, user, options)
209  def run(self, input):
210  if self.user == 'CMS':
211  return {'Topdir':None,'Directory':None}
212  topdir = castortools.lfnToCastor(castorBaseDir(user=self.user))
213  directory = '%s/%s' % (topdir,self.dataset)
214  # directory = directory.replace('//','/')
215  if not castortools.fileExists(directory):
216  if hasattr(self,'create') and self.create:
217  castortools.createCastorDir(directory)
218  #castortools.chmod(directory,'775')
219  if not castortools.isDirectory(directory):
220  raise Exception("Dataset directory '%s' does not exist or could not be created" % directory)
221  return {'Topdir':topdir,'Directory':directory}
222 
224  """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
225  def __init__(self, dataset, user, options):
226  Task.__init__(self,'CheckForMask', dataset, user, options)
227  def addOption(self, parser):
228  parser.add_option("-c", "--check", dest="check", default=False, action='store_true',help='Check filemask if available')
229  def run(self, input):
230  #skip for DBS
231  if self.user == 'CMS':
232  return {'MaskPresent':True,'Report':'Files taken from DBS'}
233 
234  dir = input['FindOnCastor']['Directory']
235  mask = "IntegrityCheck"
236  file_mask = []
237 
238  report = None
239  if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
240  file_mask = castortools.matchingFiles(dir, '^%s_.*\.txt$' % mask)
241 
242  if file_mask:
243  p = PublishToFileSystem(mask)
244  report = p.get(dir)
245  return {'MaskPresent':report is not None,'Report':report}
246 
248  """Checks whether you have write access to the CASTOR directory specified"""
249  def __init__(self, dataset, user, options):
250  Task.__init__(self,'CheckForWrite', dataset, user, options)
251  def run(self, input):
252  """Check that the directory is writable"""
253  if self.user == 'CMS':
254  return {'Directory':None,'WriteAccess':True}
255  dir = input['FindOnCastor']['Directory']
256  if self.options.check:
257 
258  _, name = tempfile.mkstemp('.txt',text=True)
259  testFile = file(name,'w')
260  testFile.write('Test file')
261  testFile.close()
262 
263  store = castortools.castorToLFN(dir)
264  #this is bad, but castortools is giving me problems
265  if not os.system('cmsStage %s %s' % (name,store)):
266  fname = '%s/%s' % (dir,os.path.basename(name))
267  write = castortools.fileExists(fname)
268  if write:
269  castortools.rm(fname)
270  else:
271  raise Exception("Failed to write to directory '%s'" % dir)
272  os.remove(name)
273  return {'Directory':dir,'WriteAccess':True}
274 
276  """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
277  def __init__(self, dataset, user, options):
278  Task.__init__(self,'GenerateMask', dataset, user, options)
279  def addOption(self, parser):
280  parser.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
281  parser.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')
282  def run(self, input):
283 
284  report = None
285  if self.options.check and not input['CheckForMask']['MaskPresent']:
286 
287  options = copy.deepcopy(self.options)
288  options.user = self.user
289 
290  if input.has_key('BaseDataset'):
291  options.name = input['BaseDataset']['Name']
292  else:
293  options.name = None
294 
295  check = IntegrityCheck(self.dataset,options)
296  check.test()
297  report = check.structured()
298  pub = PublishToFileSystem(check)
299  pub.publish(report)
300  elif input['CheckForMask']['MaskPresent']:
301  report = input['CheckForMask']['Report']
302 
303  return {'MaskPresent':report is not None,'Report':report}
304 
306  """Generates a job directory on your local drive"""
307  def __init__(self, dataset, user, options):
308  Task.__init__(self,'CreateJobDirectory', dataset, user, options)
309  def addOption(self, parser):
310  parser.add_option("-o","--output", dest="output", default=None,help='The directory to use locally for job files')
311  def run(self, input):
312  if self.options.output is not None:
313  output = self.options.output
314  else:
315  # output = '%s_%s' % (self.dataset.replace('/','.'),datetime.datetime.now().strftime("%s"))
316  # if output.startswith('.'):
317  output = '%s_%s' % (self.dataset,datetime.datetime.now().strftime("%s"))
318  output = output.lstrip('/')
319  if not os.path.exists(output):
320  mkdir_p(output)
321  return {'JobDir':output,'PWD':os.getcwd()}
322 
324  """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
325  def __init__(self, dataset, user, options):
326  Task.__init__(self,'SourceCFG', dataset, user, options)
327  def addOption(self, parser):
328  parser.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
329  parser.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
330  parser.add_option("--input-prescale", dest="prescale", default=1, type=int, help='Randomly prescale the number of good files by this factor.')
331  def run(self, input):
332 
333  jobdir = input['CreateJobDirectory']['JobDir']
334  pattern = fnmatch.translate(self.options.wildcard)
335 
336  run_range = (self.options.min_run, self.options.max_run)
337  data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
338  good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
339  #will mark prescale removed files as bad in comments
340  bad_files = [fname for fname in data.listOfFiles() if not fname in good_files]
341 
342  source = os.path.join(jobdir,'source_cfg.py')
343  output = file(source,'w')
344  output.write('###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
345  output.write('files = ' + str(good_files) + '\n')
346  for bad_file in bad_files:
347  output.write("###SourceCFG:\tBadInMask '%s'\n" % bad_file)
348  output.close()
349  return {'SourceCFG':source}
350 
351 
352 def insertLines( insertedTo, toInsert ):
353  '''insert a sequence in another sequence.
354 
355  the sequence is inserted either at the end, or at the position
356  of the HOOK, if it is found.
357  The HOOK is considered as being found if
358  str(elem).find(###ProductionTaskHook$$$)
359  is true for one of the elements in the insertedTo sequence.
360  '''
361  HOOK = '###ProductionTaskHook$$$'
362  hookIndex = None
363  for index, line in enumerate(insertedTo):
364  line = str(line)
365  if line.find(HOOK)>-1:
366  hookIndex = index
367  break
368  if hookIndex is not None:
369  before = insertedTo[:hookIndex]
370  after = insertedTo[hookIndex:]
371  result = before + toInsert + after
372  return result
373  else:
374  insertedTo.extend( toInsert )
375  return insertedTo
376 
377 
378 class FullCFG(Task):
379  """Generate the full CFG needed to run the job and writes it to the job directory"""
380  def __init__(self, dataset, user, options):
381  Task.__init__(self,'FullCFG', dataset, user, options)
382  def addOption(self, parser):
383  parser.add_option("--cfg", dest="cfg", default=None, help='The top level CFG to run')
384  parser.add_option("--nEventsPerJob", dest="nEventsPerJob", default=None, help='Number of events per job (for testing)')
385  def run(self, input):
386 
387  jobdir = input['CreateJobDirectory']['JobDir']
388 
389  if self.options.cfg is None or not os.path.exists(self.options.cfg):
390  raise Exception("The file '%s' does not exist. Please check." % self.options.cfg)
391 
392  config = file(self.options.cfg).readlines()
393  sourceFile = os.path.basename(input['SourceCFG']['SourceCFG'])
394  if sourceFile.lower().endswith('.py'):
395  sourceFile = sourceFile[:-3]
396 
397  source = os.path.join(jobdir,'full_cfg.py')
398  output = file(source,'w')
399 
400  nEventsPerJob = -1
401  if self.options.nEventsPerJob:
402  nEventsPerJob = int(self.options.nEventsPerJob)
403 
404  toInsert = ['\nfrom %s import *\n' % sourceFile,
405  'process.source.fileNames = files\n',
406  'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.format(nEvents=nEventsPerJob),
407  'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
408  'datasetInfo = ("%s","%s","%s")\n' % (self.user, self.dataset, fnmatch.translate(self.options.wildcard) )
409  ]
410  config = insertLines( config, toInsert )
411  output.writelines(config)
412  output.close()
413  return {'FullCFG':source}
414 
416  """Check the basic syntax of a CFG file by running python on it."""
417  def __init__(self, dataset, user, options):
418  Task.__init__(self,'CheckConfig', dataset, user, options)
419  def run(self, input):
420 
421  full = input['FullCFG']['FullCFG']
422 
423  child = subprocess.Popen(['python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
424  stdout, stderr = child.communicate()
425  if child.returncode != 0:
426  raise Exception("Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
427  return {'Status':'VALID'}
428 
430  """Run cmsRun but with a small number of events on the job CFG."""
431 
432  def __init__(self, dataset, user, options):
433  Task.__init__(self,'RunTestEvents', dataset, user, options)
434  def run(self, input):
435 
436  full = input['FullCFG']['FullCFG']
437  jobdir = input['CreateJobDirectory']['JobDir']
438 
439  config = file(full).readlines()
440  source = os.path.join(jobdir,'test_cfg.py')
441  output = file(source,'w')
442  toInsert = ['\n',
443  'process.maxEvents.input = cms.untracked.int32(5)\n',
444  'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
445  ]
446  config = insertLines( config, toInsert )
447  output.writelines(config)
448  output.close()
449 
450  pwd = os.getcwd()
451 
452  error = None
453  try:
454  os.chdir(jobdir)
455 
456  child = subprocess.Popen(['cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
457  stdout, stderr = child.communicate()
458 
459  if child.returncode != 0:
460  error = "Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
461  finally:
462  os.chdir(pwd)
463 
464  if error is not None:
465  raise Exception(error)
466 
467  return {'Status':'VALID','TestCFG':source}
468 
470  """Runs edmConfigDump to produce an expanded cfg file"""
471 
472  def __init__(self, dataset, user, options):
473  Task.__init__(self,'ExpandConfig', dataset, user, options)
474  def run(self, input):
475 
476  full = input['FullCFG']['FullCFG']
477  jobdir = input['CreateJobDirectory']['JobDir']
478 
479  config = file(full).read()
480  source = os.path.join(jobdir,'test_cfg.py')
481  expanded = 'Expanded%s' % os.path.basename(full)
482  output = file(source,'w')
483  output.write(config)
484  output.write("file('%s','w').write(process.dumpPython())\n" % expanded)
485  output.close()
486 
487  pwd = os.getcwd()
488 
489  result = {}
490  error = None
491  try:
492  os.chdir(jobdir)
493 
494  child = subprocess.Popen(['python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
495  stdout, stderr = child.communicate()
496 
497  if child.returncode != 0:
498  error = "Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
499  result['ExpandedFullCFG'] = os.path.join(jobdir,expanded)
500 
501  finally:
502  os.chdir(pwd)
503 
504  if error is not None:
505  raise Exception(error)
506 
507  return result
508 
510  """Publish the sample to 'Datasets.txt' if required"""
511  def __init__(self, dataset, user, options):
512  Task.__init__(self,'WriteToDatasets', dataset, user, options)
513  def run(self, input):
514  name = "%s/%s" % (self.dataset,self.options.tier)
515  name = name.replace('//','/')
516  user = self.options.batch_user
517  added = addToDatasets(name, user = user)
518  return {'Added':added, 'Name':name, 'User':user}
519 
521  """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""
522 
523  def __init__(self, dataset, user, options):
524  Task.__init__(self,'RunCMSBatch', dataset, user, options)
525  def addOption(self, parser):
526  parser.add_option("--batch_user", dest="batch_user", help="The user for LSF", default=os.getlogin())
527  parser.add_option("--run_batch", dest="run_batch", default=True, action='store_true',help='Run on the batch system')
528  parser.add_option("-N", "--numberOfInputFiles", dest="nInput",help="Number of input files per job",default=5,type=int)
529  parser.add_option("-q", "--queue", dest="queue", help="The LSF queue to use", default="1nh")
530  parser.add_option("-t", "--tier", dest="tier",
531  help="Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
532  default="")
533  parser.add_option("-G", "--group", dest="group", help="The LSF user group to use, e.g. 'u_zh'", default=None)
534 
535  def run(self, input):
536  find = FindOnCastor(self.dataset,self.options.batch_user,self.options)
537  find.create = True
538  out = find.run({})
539 
540  full = input['ExpandConfig']['ExpandedFullCFG']
541  jobdir = input['CreateJobDirectory']['JobDir']
542 
543  sampleDir = os.path.join(out['Directory'],self.options.tier)
544  sampleDir = castortools.castorToLFN(sampleDir)
545 
546  cmd = ['cmsBatch.py',str(self.options.nInput),os.path.basename(full),'-o','%s_Jobs' % self.options.tier,'--force']
547  cmd.extend(['-r',sampleDir])
548  if self.options.run_batch:
549  jname = "%s/%s" % (self.dataset,self.options.tier)
550  jname = jname.replace("//","/")
551  user_group = ''
552  if self.options.group is not None:
553  user_group = '-G %s' % self.options.group
554  cmd.extend(['-b',"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
555  print " ".join(cmd)
556 
557  pwd = os.getcwd()
558 
559  error = None
560  try:
561  os.chdir(jobdir)
562  returncode = os.system(" ".join(cmd))
563 
564  if returncode != 0:
565  error = "Running cmsBatch failed. Return code was %i." % returncode
566  finally:
567  os.chdir(pwd)
568 
569  if error is not None:
570  raise Exception(error)
571 
572  return {'SampleDataset':"%s/%s" % (self.dataset,self.options.tier),'BatchUser':self.options.batch_user,
573  'SampleOutputDir':sampleDir,'LSFJobsTopDir':os.path.join(jobdir,'%s_Jobs' % self.options.tier)}
574 
576  """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""
577  def __init__(self, dataset, user, options):
578  Task.__init__(self,'MonitorJobs', dataset, user, options)
579 
580  def getjobid(self, job_dir):
581  """Parse the LSF output to find the job id"""
582  input = os.path.join(job_dir,'job_id.txt')
583  result = None
584  if os.path.exists(input):
585  contents = file(input).read()
586  for c in contents.split('\n'):
587  if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
588  try:
589  result = c.split('<')[1].split('>')[0]
590  except Exception, e:
591  print >> sys.stderr, 'Job ID parsing error',str(e),c
592  return result
593 
594  def monitor(self, jobs, previous):
595 
596  #executes bjobs with a list of job IDs
597  cmd = ['bjobs','-u',self.options.batch_user]
598  cmd.extend([v for v in jobs.values() if v is not None])#filter out unknown IDs
599  child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
600  stdout, stderr = child.communicate()
601 
602  def parseHeader(header):
603  """Parse the header from bjobs"""
604  tokens = [t for t in header.split(' ') if t]
605  result = {}
606  for i in xrange(len(tokens)):
607  result[tokens[i]] = i
608 
609  return result
610 
611  result = {}
612  if stdout:
613  lines = stdout.split('\n')
614  if lines:
615  header = parseHeader(lines[0])
616  if not 'STAT' in header or not 'JOBID' in header:
617  print >> sys.stderr, 'Problem parsing bjobs header\n',lines
618  return result
619  for line in lines[1:]:
620  #TODO: Unreliable for some fields, e.g. dates
621  tokens = [t for t in line.split(' ') if t]
622  if len(tokens) < len(header): continue
623  id = tokens[header['JOBID']]
624  user = tokens[header['USER']]
625  status = tokens[header['STAT']]
626 
627  result[id] = status
628 
629  if stderr:
630  lines = stderr.split('\n')
631  if lines:
632  for line in lines:
633  if line and re.match('^Job <\\d*> is not found',line) is not None:
634  try:
635  id = line.split('<')[1].split('>')[0]
636  if not result.has_key(id) and not previous.has_key(id):
637  result[id] = 'FORGOTTEN'
638  except Exception, e:
639  print >> sys.stderr, 'Job ID parsing error in STDERR',str(e),line
640 
641  #after one hour the status is no longer available
642  if result:
643  for id in jobs.values():
644  if not result.has_key(id) and previous.has_key(id):
645  result[id] = previous[id]
646  return result
647 
648  def run(self, input):
649 
650  # return #COLIN
651  jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
652  if not os.path.exists(jobsdir):
653  raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
654 
655  subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
656  jobs = {}
657  for s in subjobs:
658  jobs[s] = self.getjobid(s)
659 
660  def checkStatus(stat):
661 
662  #gzip files on the fly
663  actions = {'FilesToCompress':{'Files':[]}}
664 
665  result = {}
666  for j, id in jobs.iteritems():
667  if id is None:
668  result[j] = 'UNKNOWN'
669  else:
670  if stat.has_key(id):
671  result[j] = stat[id]
672  if result[j] in ['DONE','EXIT','FORGOTTEN']:
673  stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
674  if os.path.exists(stdout):
675  #compress this file
676  actions['FilesToCompress']['Files'].append(stdout)
677  result[j] = '%s.gz' % stdout
678  elif os.path.exists('%s.gz' % stdout):
679  result[j] = '%s.gz' % stdout
680  else:
681  result[j] = 'NOSTDOUT'
682 
683  #also compress the stderr, although this is mostly empty
684  stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
685  if os.path.exists(stderr):
686  #compress this file
687  actions['FilesToCompress']['Files'].append(stderr)
688 
689  compress = GZipFiles(self.dataset,self.user,self.options)
690  compress.run(actions)
691  return result
692 
693  def countJobs(stat):
694  """Count jobs that are monitorable - i.e. not in a final state"""
695  result = []
696  for j, id in jobs.iteritems():
697  if id is not None and stat.has_key(id):
698  st = stat[id]
699  if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
700  result.append(id)
701  return result
702 
703  def writeKillScript(mon):
704  """Write a shell script to kill the jobs we know about"""
705  kill = os.path.join(jobsdir,'kill_jobs.sh')
706  output = file(kill,'w')
707  script = """
708 #!/usr/bin/env bash
709 echo "Killing jobs"
710 bkill -u %s %s
711  """ % (self.options.batch_user," ".join(mon))
712  output.write(script)
713  output.close()
714  return mon
715 
716  #continue monitoring while there are jobs to monitor
717  status = self.monitor(jobs,{})
718  monitorable = writeKillScript(countJobs(status))
719  count = 0
720 
721  while monitorable:
722  job_status = checkStatus(status)
723  time.sleep(60)
724  status = self.monitor(jobs,status)
725  monitorable = writeKillScript(countJobs(status))
726  if not (count % 3):
727  print '%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset)
728  count += 1
729 
730  return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}
731 
733  """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""
734  def __init__(self, dataset, user, options):
735  Task.__init__(self,'CheckJobStatus', dataset, user, options)
736  def addOption(self, parser):
737  parser.add_option("--output_wildcard", dest="output_wildcard", help="The wildcard to use when testing the output of this production (defaults to same as -w)", default=None)
738  def run(self, input):
739 
740  job_status = input['MonitorJobs']['LSFJobStatus']
741 
742  result = {}
743  for j, status in job_status.iteritems():
744  valid = True
745  if os.path.exists(status):
746 
747  fileHandle = None
748  if status.endswith('.gz') or status.endswith('.GZ'):
749  fileHandle = gzip.GzipFile(status)
750  else:
751  fileHandle = file(status)
752 
753  open_count = 0
754  close_count = 0
755  for line in fileHandle:
756  #start by counting files opened and closed
757  #suggestion from Enrique
758  if 'pened file' in line:
759  open_count += 1
760  if 'losed file' in line:
761  close_count += 1
762 
763  if 'Exception' in line:
764  result[j] = 'Exception'
765  valid = False
766  break
767  elif 'CPU time limit exceeded' in line:
768  result[j] = 'CPUTimeExceeded'
769  valid = False
770  break
771  elif 'Killed' in line:
772  result[j] = 'JobKilled'
773  valid = False
774  break
775  elif 'A fatal system signal has occurred' in line:
776  result[j] = 'SegFault'
777  valid = False
778  break
779 
780  if valid and open_count != close_count:
781  result[j] = 'FileOpenCloseMismatch'
782  valid = False
783  if valid:
784  result[j] = 'VALID'
785  else:
786  result[j] = status
787 
788  #allows a different wildcard in the final check.
789  options = copy.deepcopy(self.options)
790  if self.options.output_wildcard is not None:
791  options.wildcard = self.options.output_wildcard
792 
793  mask = GenerateMask(input['RunCMSBatch']['SampleDataset'],self.options.batch_user,options)
794  report = mask.run({'CheckForMask':{'MaskPresent':False}})
795  report['LSFJobStatusCheck'] = result
796  return report
797 
799  """Write a summary report on each job"""
800  def __init__(self, dataset, user, options):
801  Task.__init__(self,'WriteJobReport', dataset, user, options)
802  def run(self, input):
803 
804  report = input['CheckJobStatus']
805 
806  #collect a list of jobs by status
807  states = {}
808  for j, status in report['LSFJobStatusCheck'].iteritems():
809  if not states.has_key(status):
810  states[status] = []
811  states[status].append(j)
812  jobdir = input['CreateJobDirectory']['PWD']
813  if not os.path.exists(jobdir):
814  raise Exception("Top level job directory not found: '%s'" % jobdir)
815  report_file = os.path.join(input['CreateJobDirectory']['JobDir'],'resubmit.sh')
816 
817  output = file(report_file,'w')
818  output.write('#!/usr/bin/env bash\n')
819 
820  if report['MaskPresent']:
821  mask = report['Report']
822  output.write('#PrimaryDatasetFraction: %f\n' % mask['PrimaryDatasetFraction'])
823  output.write('#FilesGood: %i\n' % mask['FilesGood'])
824  output.write('#FilesBad: %i\n' % mask['FilesBad'])
825 
826  user_group = ''
827  if self.options.group is not None:
828  user_group = '-G %s' % self.options.group
829 
830  for status, jobs in states.iteritems():
831  output.write('# %d jobs found in state %s\n' % (len(jobs),status) )
832  if status == 'VALID':
833  continue
834  for j in jobs:
835  jdir = os.path.join(jobdir,j)
836  output.write('pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
837  output.close()
838 
839  return {'SummaryFile':report_file}
840 
842  """Removes and compresses auto-generated files from the job directory to save space."""
843  def __init__(self, dataset, user, options):
844  Task.__init__(self,'CleanJobFiles', dataset, user, options)
845  def run(self, input):
846 
847  jobdir = input['CreateJobDirectory']['JobDir']
848  jobs = input['MonitorJobs']['LSFJobIDs']
849  job_status = input['MonitorJobs']['LSFJobStatus']
850 
851  actions = {'FilesToCompress':{'Files':[]},'FilesToClean':{'Files':[]}}
852 
853  actions['FilesToClean']['Files'].append(input['ExpandConfig']['ExpandedFullCFG'])
854  if input.has_key('RunTestEvents'):
855  actions['FilesToClean']['Files'].append(input['RunTestEvents']['TestCFG'])
856 
857  for rt in glob.iglob('%s/*.root' % jobdir):
858  actions['FilesToClean']['Files'].append(rt)
859  for pyc in glob.iglob('%s/*.pyc' % jobdir):
860  actions['FilesToClean']['Files'].append(pyc)
861 
862  for j in jobs:
863  status = job_status[j]
864  if os.path.exists(status) and not status.endswith('.gz'):
865  actions['FilesToCompress']['Files'].append(status)
866 
867  compress = GZipFiles(self.dataset,self.user,self.options)
868  compressed = compress.run(actions)
869 
870  clean = CleanFiles(self.dataset,self.user,self.options)
871  removed = clean.run(actions)
872  return {'Cleaned':removed,'Compressed':compressed}
873 
def createDataset
Definition: dataset.py:426
bool check(const std::string &)
if(c.getParameter< edm::InputTag >("puppiValueMap").label().size()!=0)
static std::map< std::string, std::string > parseHeader(const std::vector< std::string > &header)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
list object
Definition: dbtoconf.py:77
def checkStatus
Definition: crabWrap.py:205
double split
Definition: MVATrainer.cc:139