CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
heppy.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2014 Colin Bernet
3 # https://github.com/cbernet/heppy/blob/master/LICENSE
4 
5 import os
6 import shutil
7 import glob
8 import sys
9 import imp
10 import copy
11 from multiprocessing import Pool
12 from pprint import pprint
13 
14 # import root in batch mode if "-i" is not among the options
15 if "-i" not in sys.argv:
16  oldv = sys.argv[:]
17  sys.argv = [ "-b-"]
18  import ROOT
19  ROOT.gROOT.SetBatch(True)
20  sys.argv = oldv
21 
22 
23 from PhysicsTools.HeppyCore.framework.looper import Looper
24 
25 # global, to be used interactively when only one component is processed.
26 loop = None
27 
28 def callBack( result ):
29  pass
30  print 'production done:', str(result)
31 
32 def runLoopAsync(comp, outDir, configName, options):
33  try:
34  loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
35  return loop.name
36  except Exception:
37  import traceback
38  print "ERROR processing component %s" % comp.name
39  print comp
40  print "STACK TRACE: "
41  print traceback.format_exc()
42  raise
43 
44 def runLoop( comp, outDir, config, options):
45  fullName = '/'.join( [outDir, comp.name ] )
46  # import pdb; pdb.set_trace()
47  config.components = [comp]
48  loop = Looper( fullName,
49  config,
50  options.nevents, 0,
51  nPrint = options.nprint,
52  timeReport = options.timeReport,
53  quiet = options.quiet)
54  # print loop
55  if options.iEvent is None:
56  loop.loop()
57  loop.write()
58  # print loop
59  else:
60  # loop.InitOutput()
61  iEvent = int(options.iEvent)
62  loop.process( iEvent )
63  return loop
64 
65 
66 def createOutputDir(dir, components, force):
67  '''Creates the output dir, dealing with the case where dir exists.'''
68  answer = None
69  try:
70  os.mkdir(dir)
71  return True
72  except OSError:
73  print 'directory %s already exists' % dir
74  print 'contents: '
75  dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
76  pprint( dirlist )
77  print 'component list: '
78  print [comp.name for comp in components]
79  if force is True:
80  print 'force mode, continue.'
81  return True
82  else:
83  while answer not in ['Y','y','yes','N','n','no']:
84  answer = raw_input('Continue? [y/n]')
85  if answer.lower().startswith('n'):
86  return False
87  elif answer.lower().startswith('y'):
88  return True
89  else:
90  raise ValueError( ' '.join(['answer can not have this value!',
91  answer]) )
92 
93 def chunks(l, n):
94  return [l[i:i+n] for i in range(0, len(l), n)]
95 
96 def split(comps):
97  # import pdb; pdb.set_trace()
98  splitComps = []
99  for comp in comps:
100  if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
101  subchunks = range(comp.fineSplitFactor)
102  for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
103  newComp = copy.deepcopy(comp)
104  newComp.files = [chunk[0]]
105  newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
106  newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
107  index=ichunk)
108  splitComps.append( newComp )
109  elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
110  chunkSize = len(comp.files) / comp.splitFactor
111  if len(comp.files) % comp.splitFactor:
112  chunkSize += 1
113  # print 'chunk size',chunkSize, len(comp.files), comp.splitFactor
114  for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
115  newComp = copy.deepcopy(comp)
116  newComp.files = chunk
117  newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
118  index=ichunk)
119  splitComps.append( newComp )
120  else:
121  splitComps.append( comp )
122  return splitComps
123 
124 
125 _heppyGlobalOptions = {}
126 
127 def getHeppyOption(name,default=None):
128  global _heppyGlobalOptions
129  return _heppyGlobalOptions[name] if name in _heppyGlobalOptions else default
130 
131 def main( options, args ):
132 
133  if len(args) != 2:
134  parser.print_help()
135  print 'ERROR: please provide the processing name and the component list'
136  sys.exit(1)
137 
138  outDir = args[0]
139  if os.path.exists(outDir) and not os.path.isdir( outDir ):
140  parser.print_help()
141  print 'ERROR: when it exists, first argument must be a directory.'
142  sys.exit(2)
143  cfgFileName = args[1]
144  if not os.path.isfile( cfgFileName ):
145  parser.print_help()
146  print 'ERROR: second argument must be an existing file (your input cfg).'
147  sys.exit(3)
148 
149  if options.verbose:
150  import logging
151  logging.basicConfig(level=logging.INFO)
152 
153  # Propagate global options to _heppyGlobalOptions within this module
154  # I have to import it explicitly, 'global' does not work since the
155  # module is not set when executing the main
156  from PhysicsTools.HeppyCore.framework.heppy import _heppyGlobalOptions
157  for opt in options.extraOptions:
158  if "=" in opt:
159  (key,val) = opt.split("=",1)
160  _heppyGlobalOptions[key] = val
161  else:
162  _heppyGlobalOptions[opt] = True
163 
164  file = open( cfgFileName, 'r' )
165  cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
166 
167  selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
168  selComps = split(selComps)
169  # for comp in selComps:
170  # print comp
171  if len(selComps)>10:
172  print "WARNING: too many threads {tnum}, will just use a maximum of 10.".format(tnum=len(selComps))
173  if not createOutputDir(outDir, selComps, options.force):
174  print 'exiting'
175  sys.exit(0)
176  if len(selComps)>1:
177  shutil.copy( cfgFileName, outDir )
178  pool = Pool(processes=min(len(selComps),10))
179  ## workaround for a scoping problem in ipython+multiprocessing
180  import PhysicsTools.HeppyCore.framework.heppy as ML
181  for comp in selComps:
182  print 'submitting', comp.name
183  pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
184  callback=ML.callBack)
185  pool.close()
186  pool.join()
187  else:
188  # when running only one loop, do not use multiprocessor module.
189  # then, the exceptions are visible -> use only one sample for testing
190  global loop
191  loop = runLoop( comp, outDir, cfg.config, options )
192 
193 
194 
195 if __name__ == '__main__':
196  from optparse import OptionParser
197 
198  parser = OptionParser()
199  parser.usage = """
200  %prog <name> <analysis_cfg>
201  For each component, start a Loop.
202  'name' is whatever you want.
203  """
204 
205  parser.add_option("-N", "--nevents",
206  dest="nevents",
207  type="int",
208  help="number of events to process",
209  default=None)
210  parser.add_option("-p", "--nprint",
211  dest="nprint",
212  help="number of events to print at the beginning",
213  default=5)
214  parser.add_option("-e", "--iEvent",
215  dest="iEvent",
216  help="jump to a given event. ignored in multiprocessing.",
217  default=None)
218  parser.add_option("-f", "--force",
219  dest="force",
220  action='store_true',
221  help="don't ask questions in case output directory already exists.",
222  default=False)
223  parser.add_option("-i", "--interactive",
224  dest="interactive",
225  action='store_true',
226  help="stay in the command line prompt instead of exiting",
227  default=False)
228  parser.add_option("-t", "--timereport",
229  dest="timeReport",
230  action='store_true',
231  help="Make a report of the time used by each analyzer",
232  default=False)
233  parser.add_option("-v", "--verbose",
234  dest="verbose",
235  action='store_true',
236  help="increase the verbosity of the output (from 'warning' to 'info' level)",
237  default=False)
238  parser.add_option("-q", "--quiet",
239  dest="quiet",
240  action='store_true',
241  help="do not print log messages to screen.",
242  default=False)
243  parser.add_option("-o", "--option",
244  dest="extraOptions",
245  type="string",
246  action="append",
247  default=[],
248  help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
249 
250  (options,args) = parser.parse_args()
251 
252  main(options, args)
253  if not options.interactive:
254  exit() # trigger exit also from ipython
def main
Definition: heppy.py:131
def callBack
Definition: heppy.py:28
def chunks
Definition: heppy.py:93
def runLoop
Definition: heppy.py:44
def split
Definition: heppy.py:96
T min(T a, T b)
Definition: MathUtil.h:58
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def runLoopAsync
Definition: heppy.py:32
Definition: main.py:1
def createOutputDir
Definition: heppy.py:66
def getHeppyOption
Definition: heppy.py:127