11 from multiprocessing
import Pool
12 from pprint
import pprint
15 if "-i" not in sys.argv:
19 ROOT.gROOT.SetBatch(
True)
23 from PhysicsTools.HeppyCore.framework.looper
import Looper
30 print 'production done:', str(result)
34 loop =
runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
38 print "ERROR processing component %s" % comp.name
41 print traceback.format_exc()
44 def runLoop( comp, outDir, config, options):
45 fullName =
'/'.
join( [outDir, comp.name ] )
47 config.components = [comp]
48 loop = Looper( fullName,
51 nPrint = options.nprint,
52 timeReport = options.timeReport)
54 if options.iEvent
is None:
60 iEvent = int(options.iEvent)
61 loop.process( iEvent )
66 '''Creates the output dir, dealing with the case where dir exists.'''
72 print 'directory %s already exists' % dir
74 dirlist = [path
for path
in os.listdir(dir)
if os.path.isdir(
'/'.
join([dir, path]) )]
76 print 'component list: '
77 print [comp.name
for comp
in components]
79 print 'force mode, continue.'
82 while answer
not in [
'Y',
'y',
'yes',
'N',
'n',
'no']:
83 answer = raw_input(
'Continue? [y/n]')
84 if answer.lower().startswith(
'n'):
86 elif answer.lower().startswith(
'y'):
89 raise ValueError(
' '.
join([
'answer can not have this value!',
93 return [l[i:i+n]
for i
in range(0, len(l), n)]
99 if hasattr( comp,
'fineSplitFactor')
and comp.fineSplitFactor>1:
100 subchunks = range(comp.fineSplitFactor)
101 for ichunk, chunk
in enumerate([(f,i)
for f
in comp.files
for i
in subchunks]):
102 newComp = copy.deepcopy(comp)
103 newComp.files = [chunk[0]]
104 newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
105 newComp.name =
'{name}_Chunk{index}'.
format(name=newComp.name,
107 splitComps.append( newComp )
108 elif hasattr( comp,
'splitFactor')
and comp.splitFactor>1:
109 chunkSize = len(comp.files) / comp.splitFactor
110 if len(comp.files) % comp.splitFactor:
113 for ichunk, chunk
in enumerate(
chunks( comp.files, chunkSize)):
114 newComp = copy.deepcopy(comp)
115 newComp.files = chunk
116 newComp.name =
'{name}_Chunk{index}'.
format(name=newComp.name,
118 splitComps.append( newComp )
120 splitComps.append( comp )
129 print 'ERROR: please provide the processing name and the component list'
133 if os.path.exists(outDir)
and not os.path.isdir( outDir ):
135 print 'ERROR: when it exists, first argument must be a directory.'
137 cfgFileName = args[1]
138 if not os.path.isfile( cfgFileName ):
140 print 'ERROR: second argument must be an existing file (your input cfg).'
143 file = open( cfgFileName,
'r' )
144 cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
146 selComps = [comp
for comp
in cfg.config.components
if len(comp.files)>0]
147 selComps =
split(selComps)
148 for comp
in selComps:
151 print "WARNING: too many threads {tnum}, will just use a maximum of 10.".
format(tnum=len(selComps))
156 shutil.copy( cfgFileName, outDir )
157 pool = Pool(processes=
min(len(selComps),10))
159 import PhysicsTools.HeppyCore.framework.heppy
as ML
160 for comp
in selComps:
161 print 'submitting', comp.name
162 pool.apply_async( ML.runLoopAsync, [comp, outDir,
'PhysicsTools.HeppyCore.__cfg_to_run__', options],
163 callback=ML.callBack)
170 loop =
runLoop( comp, outDir, cfg.config, options )
174 if __name__ ==
'__main__':
175 from optparse
import OptionParser
177 parser = OptionParser()
179 %prog <name> <analysis_cfg>
180 For each component, start a Loop.
181 'name' is whatever you want.
184 parser.add_option(
"-N",
"--nevents",
186 help=
"number of events to process",
188 parser.add_option(
"-p",
"--nprint",
190 help=
"number of events to print at the beginning",
192 parser.add_option(
"-e",
"--iEvent",
194 help=
"jump to a given event. ignored in multiprocessing.",
196 parser.add_option(
"-f",
"--force",
199 help=
"don't ask questions in case output directory already exists.",
201 parser.add_option(
"-i",
"--interactive",
204 help=
"stay in the command line prompt instead of exiting",
206 parser.add_option(
"-t",
"--timereport",
209 help=
"Make a report of the time used by each analyzer",
214 (options,args) = parser.parse_args()
217 if not options.interactive:
static std::string join(char **cmd)