2 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
6 from edmIntegrityCheck
import PublishToFileSystem, IntegrityCheck
7 from addToDatasets
import addToDatasets
9 import eostools
as castortools
12 from dataset
import Dataset
13 from datasetToSource
import createDataset
14 from castorBaseDir
import castorBaseDir
19 except OSError
as exc:
20 if exc.errno == errno.EEXIST:
25 """Base class for Task API"""
26 def __init__(self, name, dataset, user, options, instance = None):
33 """The name of the object, using the instance if needed"""
39 """A hook for adding things to the parser"""
42 """Basic API for a task. input and output are dictionaries"""
46 """Common options for the script __main__: used by all production tasks"""
48 Task.__init__(self,
'ParseOptions', dataset, user, options)
50 usage =
"""%prog [options] <dataset>
52 The %prog script aims to take a list of samples and process them on the batch system. Submission
53 may be done serially (by setting --max_threads to 1), or in parallel (the default).
57 1) Check that the sample to run on exists
58 2) Generate a source CFG
59 3) Run locally and check everything works with a small number of events
60 4) Submit to the batch system
61 5) Wait until the jobs are finished
62 6) Check the jobs ran OK and that the files are good
66 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
68 It is often useful to store the sample names in a file, in which case you could instead do:
70 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
72 An example file might contain:
74 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
75 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
76 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
78 The CASTOR username for each sample is given before the '%'.
80 Each step in the flow has a task associated with it, which may set options. The options for each task are
84 self.
das = Das.DASOptionParser(usage=usage)
86 parser.add_option(
"-u",
"--user", dest=
"user", default=os.getlogin(),help=
'The username to use when looking at mass storage devices. Your login username is used by default.')
87 parser.add_option(
"-w",
"--wildcard", dest=
"wildcard", default=
'*.root',help=
'A UNIX style wildcard to specify which input files to check before submitting the jobs')
88 parser.add_option(
"--max_threads", dest=
"max_threads", default=
None,help=
'The maximum number of threads to use in the production')
92 self.
user = self.options.user
94 raise Exception(
'TaskError: No dataset specified')
98 """Use 'datasets.py' to check that the dataset exists in the production system.
101 Task.__init__(self,
'CheckDatasetExists', dataset, user, options)
103 pattern = fnmatch.translate(self.options.wildcard)
104 run_range = (self.options.min_run, self.options.max_run)
106 if( len(data.listOfGoodFiles()) == 0 ):
107 raise Exception(
'no good root file in dataset %s | %s | %s | %s' % (self.
user,
109 self.options.wildcard,
111 return {
'Dataset':self.
dataset}
114 """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
116 Task.__init__(self,
'BaseDataset', dataset, user, options)
118 parser.add_option(
"-n",
"--name", dest=
"name", default=
None,help=
'The name of the dataset in DAS. Will be guessed if not specified')
120 """Query DAS to find out how many events are in the dataset"""
122 host = self.options.host
123 debug = self.options.verbose
124 idx = self.options.idx
125 limit = self.options.limit
128 query =
'dataset=%s' % ds
129 result = Das.get_data(host, query, idx, limit, debug)
130 result = result.replace(
'null',
'None')
131 result = result.replace(
'true',
'True')
132 result = result.replace(
'false',
'False')
134 if data[
'status'] !=
'ok':
135 raise Exception(
"Das query failed: Output is '%s'" % data)
136 return (data[
'data'],data)
141 if self.options.name
is None:
143 tokens = [t
for t
in dataset.split(os.sep)
if t]
146 ds = os.sep + os.sep.join(tokens[0:3])
148 exists, data =
check(ds)
149 self.options.name = ds
151 exists, data =
check(self.options.name)
153 raise Exception(
"Specified dataset '%s' not found in Das. Please check." % self.options.name)
156 raise Exception(
"Dataset '%s' not found in Das. Please check." % self.
dataset)
161 if (hasattr(self.
options,
'check')
and self.options.check)
or not hasattr(self.
options,
'check'):
163 return {
'Name':self.options.name,
'Das':output}
166 """GZip a list of files"""
168 Task.__init__(self,
'GZipFiles', dataset, user, options)
170 output =
'%s.gz' % fileName
172 f_in = open(fileName,
'rb')
173 f_out = gzip.open(output,
'wb')
174 f_out.writelines(f_in)
182 files = input[
'FilesToCompress'][
'Files']
186 if f
is None or not f:
continue
187 if os.path.exists(f):
189 compressed.append(gz)
190 return {
'CompressedFiles':compressed}
193 """Remove a list of files"""
195 Task.__init__(self,
'CleanFiles', dataset, user, options)
197 files = input[
'FilesToClean'][
'Files']
200 if f
is None or not f:
continue
201 if os.path.exists(f): os.remove(f)
203 return {
'CleanedFiles':removed}
206 """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
208 Task.__init__(self,
'FindOnCastor', dataset, user, options)
211 return {
'Topdir':
None,
'Directory':
None}
213 directory =
'%s/%s' % (topdir,self.
dataset)
215 if not castortools.fileExists(directory):
216 if hasattr(self,
'create')
and self.create:
217 castortools.createCastorDir(directory)
219 if not castortools.isDirectory(directory):
220 raise Exception(
"Dataset directory '%s' does not exist or could not be created" % directory)
221 return {
'Topdir':topdir,
'Directory':directory}
224 """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
226 Task.__init__(self,
'CheckForMask', dataset, user, options)
228 parser.add_option(
"-c",
"--check", dest=
"check", default=
False, action=
'store_true',help=
'Check filemask if available')
232 return {
'MaskPresent':
True,
'Report':
'Files taken from DBS'}
234 dir = input[
'FindOnCastor'][
'Directory']
235 mask =
"IntegrityCheck"
239 if (hasattr(self.
options,
'check')
and self.options.check)
or not hasattr(self.
options,
'check'):
240 file_mask = castortools.matchingFiles(dir,
'^%s_.*\.txt$' % mask)
245 return {
'MaskPresent':report
is not None,
'Report':report}
248 """Checks whether you have write access to the CASTOR directory specified"""
250 Task.__init__(self,
'CheckForWrite', dataset, user, options)
252 """Check that the directory is writable"""
254 return {
'Directory':
None,
'WriteAccess':
True}
255 dir = input[
'FindOnCastor'][
'Directory']
256 if self.options.check:
258 _, name = tempfile.mkstemp(
'.txt',text=
True)
259 testFile =
file(name,
'w')
260 testFile.write(
'Test file')
263 store = castortools.castorToLFN(dir)
265 if not os.system(
'cmsStage %s %s' % (name,store)):
266 fname =
'%s/%s' % (dir,os.path.basename(name))
267 write = castortools.fileExists(fname)
269 castortools.rm(fname)
271 raise Exception(
"Failed to write to directory '%s'" % dir)
273 return {
'Directory':dir,
'WriteAccess':
True}
276 """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
278 Task.__init__(self,
'GenerateMask', dataset, user, options)
280 parser.add_option(
"-r",
"--recursive", dest=
"resursive", default=
False, action=
'store_true',help=
'Walk the mass storage device recursively')
281 parser.add_option(
"-p",
"--printout", dest=
"printout", default=
False, action=
'store_true',help=
'Print a report to stdout')
285 if self.options.check
and not input[
'CheckForMask'][
'MaskPresent']:
287 options = copy.deepcopy(self.
options)
288 options.user = self.
user
290 if input.has_key(
'BaseDataset'):
291 options.name = input[
'BaseDataset'][
'Name']
297 report = check.structured()
300 elif input[
'CheckForMask'][
'MaskPresent']:
301 report = input[
'CheckForMask'][
'Report']
303 return {
'MaskPresent':report
is not None,
'Report':report}
306 """Generates a job directory on your local drive"""
308 Task.__init__(self,
'CreateJobDirectory', dataset, user, options)
310 parser.add_option(
"-o",
"--output", dest=
"output", default=
None,help=
'The directory to use locally for job files')
312 if self.options.output
is not None:
313 output = self.options.output
317 output =
'%s_%s' % (self.
dataset,datetime.datetime.now().strftime(
"%s"))
318 output = output.lstrip(
'/')
319 if not os.path.exists(output):
321 return {
'JobDir':output,
'PWD':os.getcwd()}
324 """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
326 Task.__init__(self,
'SourceCFG', dataset, user, options)
328 parser.add_option(
"--min-run", dest=
"min_run", default=-1, type=int, help=
'When querying DBS, require runs >= than this run')
329 parser.add_option(
"--max-run", dest=
"max_run", default=-1, type=int, help=
'When querying DBS, require runs <= than this run')
330 parser.add_option(
"--input-prescale", dest=
"prescale", default=1, type=int, help=
'Randomly prescale the number of good files by this factor.')
333 jobdir = input[
'CreateJobDirectory'][
'JobDir']
334 pattern = fnmatch.translate(self.options.wildcard)
336 run_range = (self.options.min_run, self.options.max_run)
338 good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
340 bad_files = [fname
for fname
in data.listOfFiles()
if not fname
in good_files]
342 source = os.path.join(jobdir,
'source_cfg.py')
343 output =
file(source,
'w')
344 output.write(
'###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
345 output.write(
'files = ' + str(good_files) +
'\n')
346 for bad_file
in bad_files:
347 output.write(
"###SourceCFG:\tBadInMask '%s'\n" % bad_file)
349 return {
'SourceCFG':source}
353 '''insert a sequence in another sequence.
355 the sequence is inserted either at the end, or at the position
356 of the HOOK, if it is found.
357 The HOOK is considered as being found if
358 str(elem).find(###ProductionTaskHook$$$)
359 is true for one of the elements in the insertedTo sequence.
361 HOOK =
'###ProductionTaskHook$$$'
363 for index, line
in enumerate(insertedTo):
365 if line.find(HOOK)>-1:
368 if hookIndex
is not None:
369 before = insertedTo[:hookIndex]
370 after = insertedTo[hookIndex:]
371 result = before + toInsert + after
374 insertedTo.extend( toInsert )
379 """Generate the full CFG needed to run the job and writes it to the job directory"""
381 Task.__init__(self,
'FullCFG', dataset, user, options)
383 parser.add_option(
"--cfg", dest=
"cfg", default=
None, help=
'The top level CFG to run')
384 parser.add_option(
"--nEventsPerJob", dest=
"nEventsPerJob", default=
None, help=
'Number of events per job (for testing)')
387 jobdir = input[
'CreateJobDirectory'][
'JobDir']
389 if self.options.cfg
is None or not os.path.exists(self.options.cfg):
390 raise Exception(
"The file '%s' does not exist. Please check." % self.options.cfg)
392 config =
file(self.options.cfg).readlines()
393 sourceFile = os.path.basename(input[
'SourceCFG'][
'SourceCFG'])
394 if sourceFile.lower().endswith(
'.py'):
395 sourceFile = sourceFile[:-3]
397 source = os.path.join(jobdir,
'full_cfg.py')
398 output =
file(source,
'w')
401 if self.options.nEventsPerJob:
402 nEventsPerJob = int(self.options.nEventsPerJob)
404 toInsert = [
'\nfrom %s import *\n' % sourceFile,
405 'process.source.fileNames = files\n',
406 'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.
format(nEvents=nEventsPerJob),
407 'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
408 'datasetInfo = ("%s","%s","%s")\n' % (self.
user, self.
dataset, fnmatch.translate(self.options.wildcard) )
411 output.writelines(config)
413 return {
'FullCFG':source}
416 """Check the basic syntax of a CFG file by running python on it."""
418 Task.__init__(self,
'CheckConfig', dataset, user, options)
421 full = input[
'FullCFG'][
'FullCFG']
423 child = subprocess.Popen([
'python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
424 stdout, stderr = child.communicate()
425 if child.returncode != 0:
426 raise Exception(
"Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
427 return {
'Status':
'VALID'}
430 """Run cmsRun but with a small number of events on the job CFG."""
433 Task.__init__(self,
'RunTestEvents', dataset, user, options)
436 full = input[
'FullCFG'][
'FullCFG']
437 jobdir = input[
'CreateJobDirectory'][
'JobDir']
439 config =
file(full).readlines()
440 source = os.path.join(jobdir,
'test_cfg.py')
441 output =
file(source,
'w')
443 'process.maxEvents.input = cms.untracked.int32(5)\n',
444 'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
447 output.writelines(config)
456 child = subprocess.Popen([
'cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
457 stdout, stderr = child.communicate()
459 if child.returncode != 0:
460 error =
"Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
464 if error
is not None:
467 return {
'Status':
'VALID',
'TestCFG':source}
470 """Runs edmConfigDump to produce an expanded cfg file"""
473 Task.__init__(self,
'ExpandConfig', dataset, user, options)
476 full = input[
'FullCFG'][
'FullCFG']
477 jobdir = input[
'CreateJobDirectory'][
'JobDir']
480 source = os.path.join(jobdir,
'test_cfg.py')
481 expanded =
'Expanded%s' % os.path.basename(full)
482 output =
file(source,
'w')
484 output.write(
"file('%s','w').write(process.dumpPython())\n" % expanded)
494 child = subprocess.Popen([
'python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
495 stdout, stderr = child.communicate()
497 if child.returncode != 0:
498 error =
"Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
499 result[
'ExpandedFullCFG'] = os.path.join(jobdir,expanded)
504 if error
is not None:
510 """Publish the sample to 'Datasets.txt' if required"""
512 Task.__init__(self,
'WriteToDatasets', dataset, user, options)
514 name =
"%s/%s" % (self.
dataset,self.options.tier)
515 name = name.replace(
'//',
'/')
516 user = self.options.batch_user
518 return {
'Added':added,
'Name':name,
'User':user}
521 """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""
524 Task.__init__(self,
'RunCMSBatch', dataset, user, options)
526 parser.add_option(
"--batch_user", dest=
"batch_user", help=
"The user for LSF", default=os.getlogin())
527 parser.add_option(
"--run_batch", dest=
"run_batch", default=
True, action=
'store_true',help=
'Run on the batch system')
528 parser.add_option(
"-N",
"--numberOfInputFiles", dest=
"nInput",help=
"Number of input files per job",default=5,type=int)
529 parser.add_option(
"-q",
"--queue", dest=
"queue", help=
"The LSF queue to use", default=
"1nh")
530 parser.add_option(
"-t",
"--tier", dest=
"tier",
531 help=
"Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
533 parser.add_option(
"-G",
"--group", dest=
"group", help=
"The LSF user group to use, e.g. 'u_zh'", default=
None)
540 full = input[
'ExpandConfig'][
'ExpandedFullCFG']
541 jobdir = input[
'CreateJobDirectory'][
'JobDir']
543 sampleDir = os.path.join(out[
'Directory'],self.options.tier)
544 sampleDir = castortools.castorToLFN(sampleDir)
546 cmd = [
'cmsBatch.py',str(self.options.nInput),os.path.basename(full),
'-o',
'%s_Jobs' % self.options.tier,
'--force']
547 cmd.extend([
'-r',sampleDir])
548 if self.options.run_batch:
549 jname =
"%s/%s" % (self.
dataset,self.options.tier)
550 jname = jname.replace(
"//",
"/")
552 if self.options.group
is not None:
553 user_group =
'-G %s' % self.options.group
554 cmd.extend([
'-b',
"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
562 returncode = os.system(
" ".
join(cmd))
565 error =
"Running cmsBatch failed. Return code was %i." % returncode
569 if error
is not None:
572 return {
'SampleDataset':
"%s/%s" % (self.
dataset,self.options.tier),
'BatchUser':self.options.batch_user,
573 'SampleOutputDir':sampleDir,
'LSFJobsTopDir':os.path.join(jobdir,
'%s_Jobs' % self.options.tier)}
576 """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""
578 Task.__init__(self,
'MonitorJobs', dataset, user, options)
581 """Parse the LSF output to find the job id"""
582 input = os.path.join(job_dir,
'job_id.txt')
584 if os.path.exists(input):
586 for c
in contents.split(
'\n'):
587 if c
and re.match(
'^Job <\\d*> is submitted to queue <.*>',c)
is not None:
589 result = c.split(
'<')[1].
split(
'>')[0]
591 print >> sys.stderr,
'Job ID parsing error',str(e),c
597 cmd = [
'bjobs',
'-u',self.options.batch_user]
598 cmd.extend([v
for v
in jobs.values()
if v
is not None])
599 child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
600 stdout, stderr = child.communicate()
603 """Parse the header from bjobs"""
604 tokens = [t
for t
in header.split(
' ')
if t]
606 for i
in xrange(len(tokens)):
607 result[tokens[i]] = i
613 lines = stdout.split(
'\n')
616 if not 'STAT' in header
or not 'JOBID' in header:
617 print >> sys.stderr,
'Problem parsing bjobs header\n',lines
619 for line
in lines[1:]:
621 tokens = [t
for t
in line.split(
' ')
if t]
622 if len(tokens) < len(header):
continue
623 id = tokens[header[
'JOBID']]
624 user = tokens[header[
'USER']]
625 status = tokens[header[
'STAT']]
630 lines = stderr.split(
'\n')
633 if line
and re.match(
'^Job <\\d*> is not found',line)
is not None:
635 id = line.split(
'<')[1].
split(
'>')[0]
636 if not result.has_key(id)
and not previous.has_key(id):
637 result[id] =
'FORGOTTEN'
639 print >> sys.stderr,
'Job ID parsing error in STDERR',str(e),line
643 for id
in jobs.values():
644 if not result.has_key(id)
and previous.has_key(id):
645 result[id] = previous[id]
651 jobsdir = input[
'RunCMSBatch'][
'LSFJobsTopDir']
652 if not os.path.exists(jobsdir):
653 raise Exception(
"LSF jobs dir does not exist: '%s'" % jobsdir)
655 subjobs = [s
for s
in glob.glob(
"%s/Job_[0-9]*" % jobsdir)
if os.path.isdir(s)]
663 actions = {
'FilesToCompress':{
'Files':[]}}
666 for j, id
in jobs.iteritems():
668 result[j] =
'UNKNOWN'
672 if result[j]
in [
'DONE',
'EXIT',
'FORGOTTEN']:
673 stdout = os.path.join(j,
'LSFJOB_%s' % id,
'STDOUT')
674 if os.path.exists(stdout):
676 actions[
'FilesToCompress'][
'Files'].
append(stdout)
677 result[j] =
'%s.gz' % stdout
678 elif os.path.exists(
'%s.gz' % stdout):
679 result[j] =
'%s.gz' % stdout
681 result[j] =
'NOSTDOUT'
684 stderr = os.path.join(j,
'LSFJOB_%s' % id,
'STDERR')
685 if os.path.exists(stderr):
687 actions[
'FilesToCompress'][
'Files'].
append(stderr)
690 compress.run(actions)
694 """Count jobs that are monitorable - i.e. not in a final state"""
696 for j, id
in jobs.iteritems():
697 if id
is not None and stat.has_key(id):
699 if st
in [
'PEND',
'PSUSP',
'RUN',
'USUSP',
'SSUSP',
'WAIT']:
703 def writeKillScript(mon):
704 """Write a shell script to kill the jobs we know about"""
705 kill = os.path.join(jobsdir,
'kill_jobs.sh')
706 output =
file(kill,
'w')
711 """ % (self.options.batch_user,
" ".
join(mon))
718 monitorable = writeKillScript(countJobs(status))
724 status = self.
monitor(jobs,status)
725 monitorable = writeKillScript(countJobs(status))
727 print '%s: Monitoring %i jobs (%s)' % (self.
name,len(monitorable),self.
dataset)
730 return {
'LSFJobStatus':
checkStatus(status),
'LSFJobIDs':jobs}
733 """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""
735 Task.__init__(self,
'CheckJobStatus', dataset, user, options)
737 parser.add_option(
"--output_wildcard", dest=
"output_wildcard", help=
"The wildcard to use when testing the output of this production (defaults to same as -w)", default=
None)
740 job_status = input[
'MonitorJobs'][
'LSFJobStatus']
743 for j, status
in job_status.iteritems():
745 if os.path.exists(status):
748 if status.endswith(
'.gz')
or status.endswith(
'.GZ'):
749 fileHandle = gzip.GzipFile(status)
751 fileHandle =
file(status)
755 for line
in fileHandle:
758 if 'pened file' in line:
760 if 'losed file' in line:
763 if 'Exception' in line:
764 result[j] =
'Exception'
767 elif 'CPU time limit exceeded' in line:
768 result[j] =
'CPUTimeExceeded'
771 elif 'Killed' in line:
772 result[j] =
'JobKilled'
775 elif 'A fatal system signal has occurred' in line:
776 result[j] =
'SegFault'
780 if valid
and open_count != close_count:
781 result[j] =
'FileOpenCloseMismatch'
789 options = copy.deepcopy(self.
options)
790 if self.options.output_wildcard
is not None:
791 options.wildcard = self.options.output_wildcard
793 mask =
GenerateMask(input[
'RunCMSBatch'][
'SampleDataset'],self.options.batch_user,options)
794 report = mask.run({
'CheckForMask':{
'MaskPresent':
False}})
795 report[
'LSFJobStatusCheck'] = result
799 """Write a summary report on each job"""
801 Task.__init__(self,
'WriteJobReport', dataset, user, options)
804 report = input[
'CheckJobStatus']
808 for j, status
in report[
'LSFJobStatusCheck'].iteritems():
809 if not states.has_key(status):
812 jobdir = input[
'CreateJobDirectory'][
'PWD']
813 if not os.path.exists(jobdir):
814 raise Exception(
"Top level job directory not found: '%s'" % jobdir)
815 report_file = os.path.join(input[
'CreateJobDirectory'][
'JobDir'],
'resubmit.sh')
817 output =
file(report_file,
'w')
818 output.write(
'#!/usr/bin/env bash\n')
820 if report[
'MaskPresent']:
821 mask = report[
'Report']
822 output.write(
'#PrimaryDatasetFraction: %f\n' % mask[
'PrimaryDatasetFraction'])
823 output.write(
'#FilesGood: %i\n' % mask[
'FilesGood'])
824 output.write(
'#FilesBad: %i\n' % mask[
'FilesBad'])
827 if self.options.group
is not None:
828 user_group =
'-G %s' % self.options.group
830 for status, jobs
in states.iteritems():
831 output.write(
'# %d jobs found in state %s\n' % (len(jobs),status) )
832 if status ==
'VALID':
835 jdir = os.path.join(jobdir,j)
836 output.write(
'pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
839 return {
'SummaryFile':report_file}
842 """Removes and compresses auto-generated files from the job directory to save space."""
844 Task.__init__(self,
'CleanJobFiles', dataset, user, options)
847 jobdir = input[
'CreateJobDirectory'][
'JobDir']
848 jobs = input[
'MonitorJobs'][
'LSFJobIDs']
849 job_status = input[
'MonitorJobs'][
'LSFJobStatus']
851 actions = {
'FilesToCompress':{
'Files':[]},
'FilesToClean':{
'Files':[]}}
853 actions[
'FilesToClean'][
'Files'].
append(input[
'ExpandConfig'][
'ExpandedFullCFG'])
854 if input.has_key(
'RunTestEvents'):
855 actions[
'FilesToClean'][
'Files'].
append(input[
'RunTestEvents'][
'TestCFG'])
857 for rt
in glob.iglob(
'%s/*.root' % jobdir):
858 actions[
'FilesToClean'][
'Files'].
append(rt)
859 for pyc
in glob.iglob(
'%s/*.pyc' % jobdir):
860 actions[
'FilesToClean'][
'Files'].
append(pyc)
863 status = job_status[j]
864 if os.path.exists(status)
and not status.endswith(
'.gz'):
865 actions[
'FilesToCompress'][
'Files'].
append(status)
868 compressed = compress.run(actions)
871 removed = clean.run(actions)
872 return {
'Cleaned':removed,
'Compressed':compressed}
bool check(const std::string &)
if(c.getParameter< edm::InputTag >("puppiValueMap").label().size()!=0)
static std::map< std::string, std::string > parseHeader(const std::vector< std::string > &header)
static std::string join(char **cmd)