1 from __future__
import print_function
2 from __future__
import absolute_import
4 from builtins
import range
5 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
9 from .edmIntegrityCheck
import PublishToFileSystem, IntegrityCheck
10 from .addToDatasets
import addToDatasets
12 from .
import eostools
as castortools
13 from .
import das
as Das
15 from .dataset
import Dataset
16 from .datasetToSource
import createDataset
17 from .castorBaseDir
import castorBaseDir
23 except OSError
as exc:
24 if exc.errno == errno.EEXIST:
29 """Base class for Task API""" 30 def __init__(self, name, dataset, user, options, instance = None):
37 """The name of the object, using the instance if needed""" 43 """A hook for adding things to the parser""" 46 """Basic API for a task. input and output are dictionaries""" 50 """Common options for the script __main__: used by all production tasks""" 52 Task.__init__(self,
'ParseOptions', dataset, user, options)
54 usage =
"""%prog [options] <dataset> 56 The %prog script aims to take a list of samples and process them on the batch system. Submission 57 may be done serially (by setting --max_threads to 1), or in parallel (the default). 61 1) Check that the sample to run on exists 62 2) Generate a source CFG 63 3) Run locally and check everything works with a small number of events 64 4) Submit to the batch system 65 5) Wait until the jobs are finished 66 6) Check the jobs ran OK and that the files are good 70 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2 72 It is often useful to store the sample names in a file, in which case you could instead do: 74 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt` 76 An example file might contain: 78 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2 79 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2 80 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2 82 The CASTOR username for each sample is given before the '%'. 84 Each step in the flow has a task associated with it, which may set options. The options for each task are 88 self.
das = Das.DASOptionParser(usage=usage)
90 parser.add_option(
"-u",
"--user", dest=
"user", default=os.getlogin(),help=
'The username to use when looking at mass storage devices. Your login username is used by default.')
91 parser.add_option(
"-w",
"--wildcard", dest=
"wildcard", default=
'*.root',help=
'A UNIX style wildcard to specify which input files to check before submitting the jobs')
92 parser.add_option(
"--max_threads", dest=
"max_threads", default=
None,help=
'The maximum number of threads to use in the production')
96 self.
user = self.options.user
98 raise Exception(
'TaskError: No dataset specified')
102 """Use 'datasets.py' to check that the dataset exists in the production system. 105 Task.__init__(self,
'CheckDatasetExists', dataset, user, options)
107 pattern = fnmatch.translate(self.options.wildcard)
108 run_range = (self.options.min_run, self.options.max_run)
110 if( len(data.listOfGoodFiles()) == 0 ):
111 raise Exception(
'no good root file in dataset %s | %s | %s | %s' % (self.
user,
113 self.options.wildcard,
115 return {
'Dataset':self.
dataset}
118 """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/""" 120 Task.__init__(self,
'BaseDataset', dataset, user, options)
122 parser.add_option(
"-n",
"--name", dest=
"name", default=
None,help=
'The name of the dataset in DAS. Will be guessed if not specified')
124 """Query DAS to find out how many events are in the dataset""" 126 host = self.options.host
127 debug = self.options.verbose
128 idx = self.options.idx
129 limit = self.options.limit
132 query =
'dataset=%s' % ds
133 result = Das.get_data(host, query, idx, limit, debug)
134 result = result.replace(
'null',
'None')
135 result = result.replace(
'true',
'True')
136 result = result.replace(
'false',
'False')
138 if data[
'status'] !=
'ok':
139 raise Exception(
"Das query failed: Output is '%s'" % data)
140 return (data[
'data'],data)
145 if self.options.name
is None:
147 tokens = [t
for t
in dataset.split(os.sep)
if t]
150 ds = os.sep + os.sep.join(tokens[0:3])
152 exists, data =
check(ds)
153 self.options.name = ds
155 exists, data =
check(self.options.name)
157 raise Exception(
"Specified dataset '%s' not found in Das. Please check." % self.options.name)
160 raise Exception(
"Dataset '%s' not found in Das. Please check." % self.
dataset)
165 if (hasattr(self.
options,
'check')
and self.options.check)
or not hasattr(self.
options,
'check'):
167 return {
'Name':self.options.name,
'Das':output}
170 """GZip a list of files""" 172 Task.__init__(self,
'GZipFiles', dataset, user, options)
174 output =
'%s.gz' % fileName
176 f_in = open(fileName,
'rb')
177 f_out = gzip.open(output,
'wb')
178 f_out.writelines(f_in)
186 files = input[
'FilesToCompress'][
'Files']
190 if f
is None or not f:
continue 191 if os.path.exists(f):
193 compressed.append(gz)
194 return {
'CompressedFiles':compressed}
197 """Remove a list of files""" 199 Task.__init__(self,
'CleanFiles', dataset, user, options)
201 files = input[
'FilesToClean'][
'Files']
204 if f
is None or not f:
continue 205 if os.path.exists(f): os.remove(f)
207 return {
'CleanedFiles':removed}
210 """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist.""" 212 Task.__init__(self,
'FindOnCastor', dataset, user, options)
215 return {
'Topdir':
None,
'Directory':
None}
217 directory =
'%s/%s' % (topdir,self.
dataset)
219 if not castortools.fileExists(directory):
220 if hasattr(self,
'create')
and self.create:
221 castortools.createCastorDir(directory)
223 if not castortools.isDirectory(directory):
224 raise Exception(
"Dataset directory '%s' does not exist or could not be created" % directory)
225 return {
'Topdir':topdir,
'Directory':directory}
228 """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so.""" 230 Task.__init__(self,
'CheckForMask', dataset, user, options)
232 parser.add_option(
"-c",
"--check", dest=
"check", default=
False, action=
'store_true',help=
'Check filemask if available')
236 return {
'MaskPresent':
True,
'Report':
'Files taken from DBS'}
238 dir = input[
'FindOnCastor'][
'Directory']
239 mask =
"IntegrityCheck" 243 if (hasattr(self.
options,
'check')
and self.options.check)
or not hasattr(self.
options,
'check'):
244 file_mask = castortools.matchingFiles(dir,
'^%s_.*\.txt$' % mask)
249 return {
'MaskPresent':report
is not None,
'Report':report}
252 """Checks whether you have write access to the CASTOR directory specified""" 254 Task.__init__(self,
'CheckForWrite', dataset, user, options)
256 """Check that the directory is writable""" 258 return {
'Directory':
None,
'WriteAccess':
True}
259 dir = input[
'FindOnCastor'][
'Directory']
260 if self.options.check:
262 _, name = tempfile.mkstemp(
'.txt',text=
True)
263 testFile =
file(name,
'w')
264 testFile.write(
'Test file')
267 store = castortools.castorToLFN(dir)
269 if not os.system(
'cmsStage %s %s' % (name,store)):
270 fname =
'%s/%s' % (dir,os.path.basename(name))
271 write = castortools.fileExists(fname)
273 castortools.rm(fname)
275 raise Exception(
"Failed to write to directory '%s'" % dir)
277 return {
'Directory':dir,
'WriteAccess':
True}
280 """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present.""" 282 Task.__init__(self,
'GenerateMask', dataset, user, options)
284 parser.add_option(
"-r",
"--recursive", dest=
"resursive", default=
False, action=
'store_true',help=
'Walk the mass storage device recursively')
285 parser.add_option(
"-p",
"--printout", dest=
"printout", default=
False, action=
'store_true',help=
'Print a report to stdout')
289 if self.options.check
and not input[
'CheckForMask'][
'MaskPresent']:
291 options = copy.deepcopy(self.
options)
292 options.user = self.
user 294 if 'BaseDataset' in input:
295 options.name = input[
'BaseDataset'][
'Name']
301 report = check.structured()
304 elif input[
'CheckForMask'][
'MaskPresent']:
305 report = input[
'CheckForMask'][
'Report']
307 return {
'MaskPresent':report
is not None,
'Report':report}
310 """Generates a job directory on your local drive""" 312 Task.__init__(self,
'CreateJobDirectory', dataset, user, options)
314 parser.add_option(
"-o",
"--output", dest=
"output", default=
None,help=
'The directory to use locally for job files')
316 if self.options.output
is not None:
317 output = self.options.output
321 output =
'%s_%s' % (self.
dataset,datetime.datetime.now().strftime(
"%s"))
322 output = output.lstrip(
'/')
323 if not os.path.exists(output):
325 return {
'JobDir':output,
'PWD':os.getcwd()}
328 """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'""" 330 Task.__init__(self,
'SourceCFG', dataset, user, options)
332 parser.add_option(
"--min-run", dest=
"min_run", default=-1, type=int, help=
'When querying DBS, require runs >= than this run')
333 parser.add_option(
"--max-run", dest=
"max_run", default=-1, type=int, help=
'When querying DBS, require runs <= than this run')
334 parser.add_option(
"--input-prescale", dest=
"prescale", default=1, type=int, help=
'Randomly prescale the number of good files by this factor.')
337 jobdir = input[
'CreateJobDirectory'][
'JobDir']
338 pattern = fnmatch.translate(self.options.wildcard)
340 run_range = (self.options.min_run, self.options.max_run)
342 good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
344 bad_files = [fname
for fname
in data.listOfFiles()
if not fname
in good_files]
346 source = os.path.join(jobdir,
'source_cfg.py')
347 output =
file(source,
'w')
348 output.write(
'###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
349 output.write(
'files = ' +
str(good_files) +
'\n')
350 for bad_file
in bad_files:
351 output.write(
"###SourceCFG:\tBadInMask '%s'\n" % bad_file)
353 return {
'SourceCFG':source}
357 '''insert a sequence in another sequence. 359 the sequence is inserted either at the end, or at the position 360 of the HOOK, if it is found. 361 The HOOK is considered as being found if 362 str(elem).find(###ProductionTaskHook$$$) 363 is true for one of the elements in the insertedTo sequence. 365 HOOK =
'###ProductionTaskHook$$$' 367 for index, line
in enumerate(insertedTo):
369 if line.find(HOOK)>-1:
372 if hookIndex
is not None:
373 before = insertedTo[:hookIndex]
374 after = insertedTo[hookIndex:]
375 result = before + toInsert + after
378 insertedTo.extend( toInsert )
383 """Generate the full CFG needed to run the job and writes it to the job directory""" 385 Task.__init__(self,
'FullCFG', dataset, user, options)
387 parser.add_option(
"--cfg", dest=
"cfg", default=
None, help=
'The top level CFG to run')
388 parser.add_option(
"--nEventsPerJob", dest=
"nEventsPerJob", default=
None, help=
'Number of events per job (for testing)')
391 jobdir = input[
'CreateJobDirectory'][
'JobDir']
393 if self.options.cfg
is None or not os.path.exists(self.options.cfg):
394 raise Exception(
"The file '%s' does not exist. Please check." % self.options.cfg)
396 config =
file(self.options.cfg).readlines()
397 sourceFile = os.path.basename(input[
'SourceCFG'][
'SourceCFG'])
398 if sourceFile.lower().endswith(
'.py'):
399 sourceFile = sourceFile[:-3]
401 source = os.path.join(jobdir,
'full_cfg.py')
402 output =
file(source,
'w')
405 if self.options.nEventsPerJob:
406 nEventsPerJob =
int(self.options.nEventsPerJob)
408 toInsert = [
'\nfrom %s import *\n' % sourceFile,
409 'process.source.fileNames = files\n',
410 'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.
format(nEvents=nEventsPerJob),
411 'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n' 412 'datasetInfo = ("%s","%s","%s")\n' % (self.
user, self.
dataset, fnmatch.translate(self.options.wildcard) )
415 output.writelines(config)
417 return {
'FullCFG':source}
420 """Check the basic syntax of a CFG file by running python on it.""" 422 Task.__init__(self,
'CheckConfig', dataset, user, options)
425 full = input[
'FullCFG'][
'FullCFG']
427 child = subprocess.Popen([
'python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
428 stdout, stderr = child.communicate()
429 if child.returncode != 0:
430 raise Exception(
"Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
431 return {
'Status':
'VALID'}
434 """Run cmsRun but with a small number of events on the job CFG.""" 437 Task.__init__(self,
'RunTestEvents', dataset, user, options)
440 full = input[
'FullCFG'][
'FullCFG']
441 jobdir = input[
'CreateJobDirectory'][
'JobDir']
443 config =
file(full).readlines()
444 source = os.path.join(jobdir,
'test_cfg.py')
445 output =
file(source,
'w')
447 'process.maxEvents.input = cms.untracked.int32(5)\n',
448 'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n' 451 output.writelines(config)
460 child = subprocess.Popen([
'cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
461 stdout, stderr = child.communicate()
463 if child.returncode != 0:
464 error =
"Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
468 if error
is not None:
471 return {
'Status':
'VALID',
'TestCFG':source}
474 """Runs edmConfigDump to produce an expanded cfg file""" 477 Task.__init__(self,
'ExpandConfig', dataset, user, options)
480 full = input[
'FullCFG'][
'FullCFG']
481 jobdir = input[
'CreateJobDirectory'][
'JobDir']
483 config =
file(full).read()
484 source = os.path.join(jobdir,
'test_cfg.py')
485 expanded =
'Expanded%s' % os.path.basename(full)
486 output =
file(source,
'w')
488 output.write(
"file('%s','w').write(process.dumpPython())\n" % expanded)
498 child = subprocess.Popen([
'python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
499 stdout, stderr = child.communicate()
501 if child.returncode != 0:
502 error =
"Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
503 result[
'ExpandedFullCFG'] = os.path.join(jobdir,expanded)
508 if error
is not None:
514 """Publish the sample to 'Datasets.txt' if required""" 516 Task.__init__(self,
'WriteToDatasets', dataset, user, options)
518 name =
"%s/%s" % (self.
dataset,self.options.tier)
519 name = name.replace(
'//',
'/')
520 user = self.options.batch_user
522 return {
'Added':added,
'Name':name,
'User':user}
525 """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system""" 528 Task.__init__(self,
'RunCMSBatch', dataset, user, options)
530 parser.add_option(
"--batch_user", dest=
"batch_user", help=
"The user for LSF", default=os.getlogin())
531 parser.add_option(
"--run_batch", dest=
"run_batch", default=
True, action=
'store_true',help=
'Run on the batch system')
532 parser.add_option(
"-N",
"--numberOfInputFiles", dest=
"nInput",help=
"Number of input files per job",default=5,type=int)
533 parser.add_option(
"-q",
"--queue", dest=
"queue", help=
"The LSF queue to use", default=
"1nh")
534 parser.add_option(
"-t",
"--tier", dest=
"tier",
535 help=
"Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
537 parser.add_option(
"-G",
"--group", dest=
"group", help=
"The LSF user group to use, e.g. 'u_zh'", default=
None)
544 full = input[
'ExpandConfig'][
'ExpandedFullCFG']
545 jobdir = input[
'CreateJobDirectory'][
'JobDir']
547 sampleDir = os.path.join(out[
'Directory'],self.options.tier)
548 sampleDir = castortools.castorToLFN(sampleDir)
550 cmd = [
'cmsBatch.py',
str(self.options.nInput),os.path.basename(full),
'-o',
'%s_Jobs' % self.options.tier,
'--force']
551 cmd.extend([
'-r',sampleDir])
552 if self.options.run_batch:
553 jname =
"%s/%s" % (self.
dataset,self.options.tier)
554 jname = jname.replace(
"//",
"/")
556 if self.options.group
is not None:
557 user_group =
'-G %s' % self.options.group
558 cmd.extend([
'-b',
"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
566 returncode = os.system(
" ".
join(cmd))
569 error =
"Running cmsBatch failed. Return code was %i." % returncode
573 if error
is not None:
576 return {
'SampleDataset':
"%s/%s" % (self.
dataset,self.options.tier),
'BatchUser':self.options.batch_user,
577 'SampleOutputDir':sampleDir,
'LSFJobsTopDir':os.path.join(jobdir,
'%s_Jobs' % self.options.tier)}
580 """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished.""" 582 Task.__init__(self,
'MonitorJobs', dataset, user, options)
585 """Parse the LSF output to find the job id""" 586 input = os.path.join(job_dir,
'job_id.txt')
588 if os.path.exists(input):
589 contents =
file(input).read()
590 for c
in contents.split(
'\n'):
591 if c
and re.match(
'^Job <\\d*> is submitted to queue <.*>',c)
is not None:
593 result = c.split(
'<')[1].
split(
'>')[0]
594 except Exception
as e:
595 print(
'Job ID parsing error',
str(e),c, file=sys.stderr)
601 cmd = [
'bjobs',
'-u',self.options.batch_user]
602 cmd.extend([v
for v
in jobs.values()
if v
is not None])
603 child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
604 stdout, stderr = child.communicate()
607 """Parse the header from bjobs""" 608 tokens = [t
for t
in header.split(
' ')
if t]
610 for i
in range(len(tokens)):
611 result[tokens[i]] = i
617 lines = stdout.split(
'\n')
620 if not 'STAT' in header
or not 'JOBID' in header:
621 print(
'Problem parsing bjobs header\n',lines, file=sys.stderr)
623 for line
in lines[1:]:
625 tokens = [t
for t
in line.split(
' ')
if t]
626 if len(tokens) < len(header):
continue 627 id = tokens[header[
'JOBID']]
628 user = tokens[header[
'USER']]
629 status = tokens[header[
'STAT']]
634 lines = stderr.split(
'\n')
637 if line
and re.match(
'^Job <\\d*> is not found',line)
is not None:
639 id = line.split(
'<')[1].
split(
'>')[0]
640 if id
not in result
and id
not in previous:
641 result[id] =
'FORGOTTEN' 642 except Exception
as e:
643 print(
'Job ID parsing error in STDERR',
str(e),line, file=sys.stderr)
647 for id
in jobs.values():
648 if id
not in result
and id
in previous:
649 result[id] = previous[id]
655 jobsdir = input[
'RunCMSBatch'][
'LSFJobsTopDir']
656 if not os.path.exists(jobsdir):
657 raise Exception(
"LSF jobs dir does not exist: '%s'" % jobsdir)
659 subjobs = [s
for s
in glob.glob(
"%s/Job_[0-9]*" % jobsdir)
if os.path.isdir(s)]
664 def checkStatus(stat):
667 actions = {
'FilesToCompress':{
'Files':[]}}
670 for j, id
in six.iteritems(jobs):
672 result[j] =
'UNKNOWN' 676 if result[j]
in [
'DONE',
'EXIT',
'FORGOTTEN']:
677 stdout = os.path.join(j,
'LSFJOB_%s' % id,
'STDOUT')
678 if os.path.exists(stdout):
680 actions[
'FilesToCompress'][
'Files'].
append(stdout)
681 result[j] =
'%s.gz' % stdout
682 elif os.path.exists(
'%s.gz' % stdout):
683 result[j] =
'%s.gz' % stdout
685 result[j] =
'NOSTDOUT' 688 stderr = os.path.join(j,
'LSFJOB_%s' % id,
'STDERR')
689 if os.path.exists(stderr):
691 actions[
'FilesToCompress'][
'Files'].
append(stderr)
694 compress.run(actions)
698 """Count jobs that are monitorable - i.e. not in a final state""" 700 for j, id
in six.iteritems(jobs):
701 if id
is not None and id
in stat:
703 if st
in [
'PEND',
'PSUSP',
'RUN',
'USUSP',
'SSUSP',
'WAIT']:
707 def writeKillScript(mon):
708 """Write a shell script to kill the jobs we know about""" 709 kill = os.path.join(jobsdir,
'kill_jobs.sh')
710 output =
file(kill,
'w')
715 """ % (self.options.batch_user,
" ".
join(mon))
722 monitorable = writeKillScript(countJobs(status))
726 job_status = checkStatus(status)
728 status = self.
monitor(jobs,status)
729 monitorable = writeKillScript(countJobs(status))
731 print(
'%s: Monitoring %i jobs (%s)' % (self.
name,len(monitorable),self.
dataset))
734 return {
'LSFJobStatus':checkStatus(status),
'LSFJobIDs':jobs}
737 """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly.""" 739 Task.__init__(self,
'CheckJobStatus', dataset, user, options)
741 parser.add_option(
"--output_wildcard", dest=
"output_wildcard", help=
"The wildcard to use when testing the output of this production (defaults to same as -w)", default=
None)
744 job_status = input[
'MonitorJobs'][
'LSFJobStatus']
747 for j, status
in six.iteritems(job_status):
749 if os.path.exists(status):
752 if status.endswith(
'.gz')
or status.endswith(
'.GZ'):
753 fileHandle = gzip.GzipFile(status)
755 fileHandle =
file(status)
759 for line
in fileHandle:
762 if 'pened file' in line:
764 if 'losed file' in line:
767 if 'Exception' in line:
768 result[j] =
'Exception' 771 elif 'CPU time limit exceeded' in line:
772 result[j] =
'CPUTimeExceeded' 775 elif 'Killed' in line:
776 result[j] =
'JobKilled' 779 elif 'A fatal system signal has occurred' in line:
780 result[j] =
'SegFault' 784 if valid
and open_count != close_count:
785 result[j] =
'FileOpenCloseMismatch' 793 options = copy.deepcopy(self.
options)
794 if self.options.output_wildcard
is not None:
795 options.wildcard = self.options.output_wildcard
797 mask =
GenerateMask(input[
'RunCMSBatch'][
'SampleDataset'],self.options.batch_user,options)
798 report = mask.run({
'CheckForMask':{
'MaskPresent':
False}})
799 report[
'LSFJobStatusCheck'] = result
803 """Write a summary report on each job""" 805 Task.__init__(self,
'WriteJobReport', dataset, user, options)
808 report = input[
'CheckJobStatus']
812 for j, status
in six.iteritems(report[
'LSFJobStatusCheck']):
813 if status
not in states:
816 jobdir = input[
'CreateJobDirectory'][
'PWD']
817 if not os.path.exists(jobdir):
818 raise Exception(
"Top level job directory not found: '%s'" % jobdir)
819 report_file = os.path.join(input[
'CreateJobDirectory'][
'JobDir'],
'resubmit.sh')
821 output =
file(report_file,
'w')
822 output.write(
'#!/usr/bin/env bash\n')
824 if report[
'MaskPresent']:
825 mask = report[
'Report']
826 output.write(
'#PrimaryDatasetFraction: %f\n' % mask[
'PrimaryDatasetFraction'])
827 output.write(
'#FilesGood: %i\n' % mask[
'FilesGood'])
828 output.write(
'#FilesBad: %i\n' % mask[
'FilesBad'])
831 if self.options.group
is not None:
832 user_group =
'-G %s' % self.options.group
834 for status, jobs
in six.iteritems(states):
835 output.write(
'# %d jobs found in state %s\n' % (len(jobs),status) )
836 if status ==
'VALID':
839 jdir = os.path.join(jobdir,j)
840 output.write(
'pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
843 return {
'SummaryFile':report_file}
846 """Removes and compresses auto-generated files from the job directory to save space.""" 848 Task.__init__(self,
'CleanJobFiles', dataset, user, options)
851 jobdir = input[
'CreateJobDirectory'][
'JobDir']
852 jobs = input[
'MonitorJobs'][
'LSFJobIDs']
853 job_status = input[
'MonitorJobs'][
'LSFJobStatus']
855 actions = {
'FilesToCompress':{
'Files':[]},
'FilesToClean':{
'Files':[]}}
857 actions[
'FilesToClean'][
'Files'].
append(input[
'ExpandConfig'][
'ExpandedFullCFG'])
858 if 'RunTestEvents' in input:
859 actions[
'FilesToClean'][
'Files'].
append(input[
'RunTestEvents'][
'TestCFG'])
861 for rt
in glob.iglob(
'%s/*.root' % jobdir):
862 actions[
'FilesToClean'][
'Files'].
append(rt)
863 for pyc
in glob.iglob(
'%s/*.pyc' % jobdir):
864 actions[
'FilesToClean'][
'Files'].
append(pyc)
867 status = job_status[j]
868 if os.path.exists(status)
and not status.endswith(
'.gz'):
869 actions[
'FilesToCompress'][
'Files'].
append(status)
872 compressed = compress.run(actions)
875 removed = clean.run(actions)
876 return {
'Cleaned':removed,
'Compressed':compressed}
def insertLines(insertedTo, toInsert)
def __init__(self, dataset, user, options)
def addOption(self, parser)
def addOption(self, parser)
def addOption(self, parser)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
S & print(S &os, JobReport::InputFile const &f)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def addOption(self, parser)
def __init__(self, dataset, user, options)
def addOption(self, parser)
def addOption(self, parser)
def __init__(self, dataset, user, options)
static std::map< std::string, std::string > parseHeader(const std::vector< std::string > &header)
def __init__(self, dataset, user, options)
def createDataset(user, dataset, pattern, readcache=False, basedir=None, run_range=None)
def getjobid(self, job_dir)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
static std::string join(char **cmd)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def addOption(self, parser)
def __init__(self, dataset, user, options)
def __init__(self, name, dataset, user, options, instance=None)
def addOption(self, parser)
def __init__(self, dataset, user, options)
def monitor(self, jobs, previous)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def __init__(self, dataset, user, options)
def addOption(self, parser)