CMS 3D CMS Logo

heppy_loop.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2014 Colin Bernet
3 # https://github.com/cbernet/heppy/blob/master/LICENSE
4 
5 from __future__ import print_function
6 from builtins import range
7 import os
8 import shutil
9 import glob
10 import sys
11 import imp
12 import copy
13 from multiprocessing import Pool
14 from pprint import pprint
15 
16 # import root in batch mode if "-i" is not among the options
17 if "-i" not in sys.argv:
18  oldv = sys.argv[:]
19  sys.argv = [ "-b-"]
20  import ROOT
21  ROOT.gROOT.SetBatch(True)
22  sys.argv = oldv
23 
24 
25 from PhysicsTools.HeppyCore.framework.looper import Looper
26 
27 # global, to be used interactively when only one component is processed.
28 loop = None
29 
30 def callBack( result ):
31  pass
32  print('production done:', str(result))
33 
34 def runLoopAsync(comp, outDir, configName, options):
35  try:
36  loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
37  return loop.name
38  except Exception:
39  import traceback
40  print("ERROR processing component %s" % comp.name)
41  print(comp)
42  print("STACK TRACE: ")
43  print(traceback.format_exc())
44  raise
45 
46 def runLoop( comp, outDir, config, options):
47  fullName = '/'.join( [outDir, comp.name ] )
48  # import pdb; pdb.set_trace()
49  config.components = [comp]
50  loop = Looper( fullName,
51  config,
52  options.nevents, 0,
53  nPrint = options.nprint,
54  timeReport = options.timeReport,
55  quiet = options.quiet)
56  # print loop
57  if options.iEvent is None:
58  loop.loop()
59  loop.write()
60  # print loop
61  else:
62  # loop.InitOutput()
63  iEvent = int(options.iEvent)
64  loop.process( iEvent )
65  return loop
66 
67 
68 def createOutputDir(dir, components, force):
69  '''Creates the output dir, dealing with the case where dir exists.'''
70  answer = None
71  try:
72  os.mkdir(dir)
73  return True
74  except OSError:
75  print('directory %s already exists' % dir)
76  print('contents: ')
77  dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
78  pprint( dirlist )
79  print('component list: ')
80  print([comp.name for comp in components])
81  if force is True:
82  print('force mode, continue.')
83  return True
84  else:
85  while answer not in ['Y','y','yes','N','n','no']:
86  answer = raw_input('Continue? [y/n]')
87  if answer.lower().startswith('n'):
88  return False
89  elif answer.lower().startswith('y'):
90  return True
91  else:
92  raise ValueError( ' '.join(['answer can not have this value!',
93  answer]) )
94 
95 def chunks(l, n):
96  return [l[i:i+n] for i in range(0, len(l), n)]
97 
98 def split(comps):
99  # import pdb; pdb.set_trace()
100  splitComps = []
101  for comp in comps:
102  if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
103  subchunks = list(range(comp.fineSplitFactor))
104  for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
105  newComp = copy.deepcopy(comp)
106  newComp.files = [chunk[0]]
107  newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
108  newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
109  index=ichunk)
110  splitComps.append( newComp )
111  elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
112  chunkSize = len(comp.files) / comp.splitFactor
113  if len(comp.files) % comp.splitFactor:
114  chunkSize += 1
115  # print 'chunk size',chunkSize, len(comp.files), comp.splitFactor
116  for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
117  newComp = copy.deepcopy(comp)
118  newComp.files = chunk
119  newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
120  index=ichunk)
121  splitComps.append( newComp )
122  else:
123  splitComps.append( comp )
124  return splitComps
125 
126 
127 _heppyGlobalOptions = {}
128 
129 def getHeppyOption(name,default=None):
130  global _heppyGlobalOptions
131  return _heppyGlobalOptions[name] if name in _heppyGlobalOptions else default
132 def setHeppyOption(name,value=True):
133  global _heppyGlobalOptions
134  _heppyGlobalOptions[name] = value
135 
136 def main( options, args, parser ):
137 
138  if len(args) != 2:
139  parser.print_help()
140  print('ERROR: please provide the processing name and the component list')
141  sys.exit(1)
142 
143  outDir = args[0]
144  if os.path.exists(outDir) and not os.path.isdir( outDir ):
145  parser.print_help()
146  print('ERROR: when it exists, first argument must be a directory.')
147  sys.exit(2)
148  cfgFileName = args[1]
149  if not os.path.isfile( cfgFileName ):
150  parser.print_help()
151  print('ERROR: second argument must be an existing file (your input cfg).')
152  sys.exit(3)
153 
154  if options.verbose:
155  import logging
156  logging.basicConfig(level=logging.INFO)
157 
158  # Propagate global options to _heppyGlobalOptions within this module
159  # I have to import it explicitly, 'global' does not work since the
160  # module is not set when executing the main
161  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
162  for opt in options.extraOptions:
163  if "=" in opt:
164  (key,val) = opt.split("=",1)
165  _heppyGlobalOptions[key] = val
166  else:
167  _heppyGlobalOptions[opt] = True
168 
169  file = open( cfgFileName, 'r' )
170  cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
171 
172  selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
173  selComps = split(selComps)
174  # for comp in selComps:
175  # print comp
176  if len(selComps)>options.ntasks:
177  print("WARNING: too many threads {tnum}, will just use a maximum of {jnum}.".format(tnum=len(selComps),jnum=options.ntasks))
178  if not createOutputDir(outDir, selComps, options.force):
179  print('exiting')
180  sys.exit(0)
181  if len(selComps)>1:
182  shutil.copy( cfgFileName, outDir )
183  pool = Pool(processes=min(len(selComps),options.ntasks))
184  ## workaround for a scoping problem in ipython+multiprocessing
185  import PhysicsTools.HeppyCore.framework.heppy_loop as ML
186  for comp in selComps:
187  print('submitting', comp.name)
188  pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
189  callback=ML.callBack)
190  pool.close()
191  pool.join()
192  else:
193  # when running only one loop, do not use multiprocessor module.
194  # then, the exceptions are visible -> use only one sample for testing
195  global loop
196  loop = runLoop( comp, outDir, cfg.config, options )
197  return loop
def runLoopAsync(comp, outDir, configName, options)
Definition: heppy_loop.py:34
def split(comps)
Definition: heppy_loop.py:98
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def setHeppyOption(name, value=True)
Definition: heppy_loop.py:132
def runLoop(comp, outDir, config, options)
Definition: heppy_loop.py:46
T min(T a, T b)
Definition: MathUtil.h:58
def main(options, args, parser)
Definition: heppy_loop.py:136
def chunks(l, n)
Definition: heppy_loop.py:95
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def createOutputDir(dir, components, force)
Definition: heppy_loop.py:68
def getHeppyOption(name, default=None)
Definition: heppy_loop.py:129
#define str(s)
def callBack(result)
Definition: heppy_loop.py:30
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run