11 from multiprocessing
import Pool
12 from pprint
import pprint
15 if "-i" not in sys.argv:
19 ROOT.gROOT.SetBatch(
True)
23 from PhysicsTools.HeppyCore.framework.looper
import Looper
30 print 'production done:', str(result)
34 loop =
runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
38 print "ERROR processing component %s" % comp.name
41 print traceback.format_exc()
44 def runLoop( comp, outDir, config, options):
45 fullName =
'/'.
join( [outDir, comp.name ] )
47 config.components = [comp]
48 loop = Looper( fullName,
51 nPrint = options.nprint,
52 timeReport = options.timeReport,
53 quiet = options.quiet)
55 if options.iEvent
is None:
61 iEvent = int(options.iEvent)
62 loop.process( iEvent )
67 '''Creates the output dir, dealing with the case where dir exists.'''
73 print 'directory %s already exists' % dir
75 dirlist = [path
for path
in os.listdir(dir)
if os.path.isdir(
'/'.
join([dir, path]) )]
77 print 'component list: '
78 print [comp.name
for comp
in components]
80 print 'force mode, continue.'
83 while answer
not in [
'Y',
'y',
'yes',
'N',
'n',
'no']:
84 answer = raw_input(
'Continue? [y/n]')
85 if answer.lower().startswith(
'n'):
87 elif answer.lower().startswith(
'y'):
90 raise ValueError(
' '.
join([
'answer can not have this value!',
94 return [l[i:i+n]
for i
in range(0, len(l), n)]
100 if hasattr( comp,
'fineSplitFactor')
and comp.fineSplitFactor>1:
101 subchunks = range(comp.fineSplitFactor)
102 for ichunk, chunk
in enumerate([(f,i)
for f
in comp.files
for i
in subchunks]):
103 newComp = copy.deepcopy(comp)
104 newComp.files = [chunk[0]]
105 newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
106 newComp.name =
'{name}_Chunk{index}'.
format(name=newComp.name,
108 splitComps.append( newComp )
109 elif hasattr( comp,
'splitFactor')
and comp.splitFactor>1:
110 chunkSize = len(comp.files) / comp.splitFactor
111 if len(comp.files) % comp.splitFactor:
114 for ichunk, chunk
in enumerate(
chunks( comp.files, chunkSize)):
115 newComp = copy.deepcopy(comp)
116 newComp.files = chunk
117 newComp.name =
'{name}_Chunk{index}'.
format(name=newComp.name,
119 splitComps.append( newComp )
121 splitComps.append( comp )
125 _heppyGlobalOptions = {}
128 global _heppyGlobalOptions
129 return _heppyGlobalOptions[name]
if name
in _heppyGlobalOptions
else default
135 print 'ERROR: please provide the processing name and the component list'
139 if os.path.exists(outDir)
and not os.path.isdir( outDir ):
141 print 'ERROR: when it exists, first argument must be a directory.'
143 cfgFileName = args[1]
144 if not os.path.isfile( cfgFileName ):
146 print 'ERROR: second argument must be an existing file (your input cfg).'
151 logging.basicConfig(level=logging.INFO)
156 from PhysicsTools.HeppyCore.framework.heppy
import _heppyGlobalOptions
157 for opt
in options.extraOptions:
159 (key,val) = opt.split(
"=",1)
160 _heppyGlobalOptions[key] = val
162 _heppyGlobalOptions[opt] =
True
164 file = open( cfgFileName,
'r' )
165 cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
167 selComps = [comp
for comp
in cfg.config.components
if len(comp.files)>0]
168 selComps =
split(selComps)
172 print "WARNING: too many threads {tnum}, will just use a maximum of 10.".
format(tnum=len(selComps))
177 shutil.copy( cfgFileName, outDir )
178 pool = Pool(processes=
min(len(selComps),10))
180 import PhysicsTools.HeppyCore.framework.heppy
as ML
181 for comp
in selComps:
182 print 'submitting', comp.name
183 pool.apply_async( ML.runLoopAsync, [comp, outDir,
'PhysicsTools.HeppyCore.__cfg_to_run__', options],
184 callback=ML.callBack)
191 loop =
runLoop( comp, outDir, cfg.config, options )
195 if __name__ ==
'__main__':
196 from optparse
import OptionParser
198 parser = OptionParser()
200 %prog <name> <analysis_cfg>
201 For each component, start a Loop.
202 'name' is whatever you want.
205 parser.add_option(
"-N",
"--nevents",
208 help=
"number of events to process",
210 parser.add_option(
"-p",
"--nprint",
212 help=
"number of events to print at the beginning",
214 parser.add_option(
"-e",
"--iEvent",
216 help=
"jump to a given event. ignored in multiprocessing.",
218 parser.add_option(
"-f",
"--force",
221 help=
"don't ask questions in case output directory already exists.",
223 parser.add_option(
"-i",
"--interactive",
226 help=
"stay in the command line prompt instead of exiting",
228 parser.add_option(
"-t",
"--timereport",
231 help=
"Make a report of the time used by each analyzer",
233 parser.add_option(
"-v",
"--verbose",
236 help=
"increase the verbosity of the output (from 'warning' to 'info' level)",
238 parser.add_option(
"-q",
"--quiet",
241 help=
"do not print log messages to screen.",
243 parser.add_option(
"-o",
"--option",
248 help=
"Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
250 (options,args) = parser.parse_args()
253 if not options.interactive:
static std::string join(char **cmd)