CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
heppy.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2014 Colin Bernet
3 # https://github.com/cbernet/heppy/blob/master/LICENSE
4 
5 import os
6 import shutil
7 import glob
8 import sys
9 import imp
10 import copy
11 from multiprocessing import Pool
12 from pprint import pprint
13 
14 # import root in batch mode if "-i" is not among the options
15 if "-i" not in sys.argv:
16  oldv = sys.argv[:]
17  sys.argv = [ "-b-"]
18  import ROOT
19  ROOT.gROOT.SetBatch(True)
20  sys.argv = oldv
21 
22 
23 from PhysicsTools.HeppyCore.framework.looper import Looper
24 
25 # global, to be used interactively when only one component is processed.
26 loop = None
27 
28 def callBack( result ):
29  pass
30  print 'production done:', str(result)
31 
32 def runLoopAsync(comp, outDir, configName, options):
33  try:
34  loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
35  return loop.name
36  except Exception:
37  import traceback
38  print "ERROR processing component %s" % comp.name
39  print comp
40  print "STACK TRACE: "
41  print traceback.format_exc()
42  raise
43 
44 def runLoop( comp, outDir, config, options):
45  fullName = '/'.join( [outDir, comp.name ] )
46  # import pdb; pdb.set_trace()
47  config.components = [comp]
48  loop = Looper( fullName,
49  config,
50  options.nevents, 0,
51  nPrint = options.nprint,
52  timeReport = options.timeReport)
53  print loop
54  if options.iEvent is None:
55  loop.loop()
56  loop.write()
57  print loop
58  else:
59  # loop.InitOutput()
60  iEvent = int(options.iEvent)
61  loop.process( iEvent )
62  return loop
63 
64 
65 def createOutputDir(dir, components, force):
66  '''Creates the output dir, dealing with the case where dir exists.'''
67  answer = None
68  try:
69  os.mkdir(dir)
70  return True
71  except OSError:
72  print 'directory %s already exists' % dir
73  print 'contents: '
74  dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
75  pprint( dirlist )
76  print 'component list: '
77  print [comp.name for comp in components]
78  if force is True:
79  print 'force mode, continue.'
80  return True
81  else:
82  while answer not in ['Y','y','yes','N','n','no']:
83  answer = raw_input('Continue? [y/n]')
84  if answer.lower().startswith('n'):
85  return False
86  elif answer.lower().startswith('y'):
87  return True
88  else:
89  raise ValueError( ' '.join(['answer can not have this value!',
90  answer]) )
91 
92 def chunks(l, n):
93  return [l[i:i+n] for i in range(0, len(l), n)]
94 
95 def split(comps):
96  # import pdb; pdb.set_trace()
97  splitComps = []
98  for comp in comps:
99  if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
100  subchunks = range(comp.fineSplitFactor)
101  for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
102  newComp = copy.deepcopy(comp)
103  newComp.files = [chunk[0]]
104  newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
105  newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
106  index=ichunk)
107  splitComps.append( newComp )
108  elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
109  chunkSize = len(comp.files) / comp.splitFactor
110  if len(comp.files) % comp.splitFactor:
111  chunkSize += 1
112  # print 'chunk size',chunkSize, len(comp.files), comp.splitFactor
113  for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
114  newComp = copy.deepcopy(comp)
115  newComp.files = chunk
116  newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
117  index=ichunk)
118  splitComps.append( newComp )
119  else:
120  splitComps.append( comp )
121  return splitComps
122 
123 
124 
125 def main( options, args ):
126 
127  if len(args) != 2:
128  parser.print_help()
129  print 'ERROR: please provide the processing name and the component list'
130  sys.exit(1)
131 
132  outDir = args[0]
133  if os.path.exists(outDir) and not os.path.isdir( outDir ):
134  parser.print_help()
135  print 'ERROR: when it exists, first argument must be a directory.'
136  sys.exit(2)
137  cfgFileName = args[1]
138  if not os.path.isfile( cfgFileName ):
139  parser.print_help()
140  print 'ERROR: second argument must be an existing file (your input cfg).'
141  sys.exit(3)
142 
143  file = open( cfgFileName, 'r' )
144  cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
145 
146  selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
147  selComps = split(selComps)
148  for comp in selComps:
149  print comp
150  if len(selComps)>10:
151  print "WARNING: too many threads {tnum}, will just use a maximum of 10.".format(tnum=len(selComps))
152  if not createOutputDir(outDir, selComps, options.force):
153  print 'exiting'
154  sys.exit(0)
155  if len(selComps)>1:
156  shutil.copy( cfgFileName, outDir )
157  pool = Pool(processes=min(len(selComps),10))
158  ## workaround for a scoping problem in ipython+multiprocessing
159  import PhysicsTools.HeppyCore.framework.heppy as ML
160  for comp in selComps:
161  print 'submitting', comp.name
162  pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
163  callback=ML.callBack)
164  pool.close()
165  pool.join()
166  else:
167  # when running only one loop, do not use multiprocessor module.
168  # then, the exceptions are visible -> use only one sample for testing
169  global loop
170  loop = runLoop( comp, outDir, cfg.config, options )
171 
172 
173 
174 if __name__ == '__main__':
175  from optparse import OptionParser
176 
177  parser = OptionParser()
178  parser.usage = """
179  %prog <name> <analysis_cfg>
180  For each component, start a Loop.
181  'name' is whatever you want.
182  """
183 
184  parser.add_option("-N", "--nevents",
185  dest="nevents",
186  help="number of events to process",
187  default=None)
188  parser.add_option("-p", "--nprint",
189  dest="nprint",
190  help="number of events to print at the beginning",
191  default=5)
192  parser.add_option("-e", "--iEvent",
193  dest="iEvent",
194  help="jump to a given event. ignored in multiprocessing.",
195  default=None)
196  parser.add_option("-f", "--force",
197  dest="force",
198  action='store_true',
199  help="don't ask questions in case output directory already exists.",
200  default=False)
201  parser.add_option("-i", "--interactive",
202  dest="interactive",
203  action='store_true',
204  help="stay in the command line prompt instead of exiting",
205  default=False)
206  parser.add_option("-t", "--timereport",
207  dest="timeReport",
208  action='store_true',
209  help="Make a report of the time used by each analyzer",
210  default=False)
211 
212 
213 
214  (options,args) = parser.parse_args()
215 
216  main(options, args)
217  if not options.interactive:
218  exit() # trigger exit also from ipython
def main
Definition: heppy.py:125
def callBack
Definition: heppy.py:28
def chunks
Definition: heppy.py:92
def runLoop
Definition: heppy.py:44
def split
Definition: heppy.py:95
T min(T a, T b)
Definition: MathUtil.h:58
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def runLoopAsync
Definition: heppy.py:32
Definition: main.py:1
def createOutputDir
Definition: heppy.py:65