CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
looper.py
Go to the documentation of this file.
1 # Copyright (C) 2014 Colin Bernet
2 # https://github.com/cbernet/heppy/blob/master/LICENSE
3 
4 import os
5 import sys
6 import imp
7 # import copy
8 import logging
9 import pprint
10 from platform import platform
11 from math import ceil
12 from event import Event
13 import timeit
14 
15 class Setup(object):
16  '''The Looper creates a Setup object to hold information relevant during
17  the whole process, such as the process configuration obtained from
18  the configuration file, or services that can be used by several analyzers.
19 
20  The user may freely attach new information to the setup object,
21  as long as this information is relevant during the whole process.
22  If the information is event specific, it should be attached to the event
23  object instead.
24  '''
25  def __init__(self, config, services):
26  '''
27  Create a Setup object.
28 
29  parameters:
30 
31  config: configuration object from the configuration file
32 
33  services: dictionary of services indexed by service name.
34  The service name has the form classObject_instanceLabel
35  as in this example:
36  <base_heppy_path>.framework.services.tfile.TFileService_myhists
37  To find out about the service name of a given service,
38  load your configuration file in python, and print the service.
39  '''
40  self.config = config
41  self.services = services
42 
43  def close(self):
44  '''Stop all services'''
45  for service in self.services.values():
46  service.stop()
47 
48 
49 class Looper(object):
50  """Creates a set of analyzers, and schedules the event processing."""
51 
52  def __init__( self, name,
53  config,
54  nEvents=None,
55  firstEvent=0, nPrint=0, timeReport=False ):
56  """Handles the processing of an event sample.
57  An Analyzer is built for each Config.Analyzer present
58  in sequence. The Looper can then be used to process an event,
59  or a collection of events.
60 
61  Parameters:
62  name : name of the Looper, will be used as the output directory name
63  config : process configuration information, see Config
64  nEvents : number of events to process. Defaults to all.
65  firstEvent : first event to process. Defaults to the first one.
66  nPrint : number of events to print at the beginning
67  """
68 
69  self.name = self._prepareOutput(name)
70  self.outDir = self.name
71  self.logger = logging.getLogger( self.name )
72  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
73  'log.txt'])))
74  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
75 
76  self.cfg_comp = config.components[0]
77  self.classes = {}
78  self.analyzers = map( self._build, config.sequence )
79  self.nEvents = nEvents
80  self.firstEvent = firstEvent
81  self.nPrint = int(nPrint)
82  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
83  tree_name = None
84  if( hasattr(self.cfg_comp, 'tree_name') ):
85  tree_name = self.cfg_comp.tree_name
86  if len(self.cfg_comp.files)==0:
87  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
88  raise ValueError( errmsg )
89  self.events = config.events_class(self.cfg_comp.files, tree_name)
90  if hasattr(self.cfg_comp, 'fineSplit'):
91  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
92  if fineSplitFactor > 1:
93  if len(self.cfg_comp.files) != 1:
94  raise RuntimeError, "Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files)
95  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
96  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
97  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
98  #print "For component %s will process %d events starting from the %d one" % (self.cfg_comp.name, self.nEvents, self.firstEvent)
99  # self.event is set in self.process
100  self.event = None
101  services = dict()
102  for cfg_serv in config.services:
103  service = self._build(cfg_serv)
104  services[cfg_serv.name] = service
105  # would like to provide a copy of the config to the setup,
106  # so that analyzers cannot modify the config of other analyzers.
107  # but cannot copy the autofill config.
108  self.setup = Setup(config, services)
109 
110  def _build(self, cfg):
111  theClass = cfg.class_object
112  obj = theClass( cfg, self.cfg_comp, self.outDir )
113  return obj
114 
115  def _prepareOutput(self, name):
116  index = 0
117  tmpname = name
118  while True and index < 2000:
119  try:
120  # print 'mkdir', self.name
121  os.mkdir( tmpname )
122  break
123  except OSError:
124  index += 1
125  tmpname = '%s_%d' % (name, index)
126  if index == 2000:
127  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
128  return tmpname
129 
130 
131  def loop(self):
132  """Loop on a given number of events.
133 
134  At the beginning of the loop,
135  Analyzer.beginLoop is called for each Analyzer.
136  At each event, self.process is called.
137  At the end of the loop, Analyzer.endLoop is called.
138  """
139  nEvents = self.nEvents
140  firstEvent = self.firstEvent
141  iEv = firstEvent
142  if nEvents is None or int(nEvents) > len(self.events) :
143  nEvents = len(self.events)
144  else:
145  nEvents = int(nEvents)
146  eventSize = nEvents
147  self.logger.warning(
148  'starting loop at event {firstEvent} '\
149  'to process {eventSize} events.'.format(firstEvent=firstEvent,
150  eventSize=eventSize))
151  self.logger.warning( str( self.cfg_comp ) )
152  for analyzer in self.analyzers:
153  analyzer.beginLoop(self.setup)
154  try:
155  for iEv in range(firstEvent, firstEvent+eventSize):
156  # if iEv == nEvents:
157  # break
158  if iEv%100 ==0:
159  # print 'event', iEv
160  if not hasattr(self,'start_time'):
161  print 'event', iEv
162  self.start_time = timeit.default_timer()
163  self.start_time_event = iEv
164  else:
165  print 'event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time))
166 
167  self.process( iEv )
168  if iEv<self.nPrint:
169  print self.event
170 
171  except UserWarning:
172  print 'Stopped loop following a UserWarning exception'
173  for analyzer in self.analyzers:
174  analyzer.endLoop(self.setup)
175  warn = self.logger.warning
176  warn('')
177  warn( self.cfg_comp )
178  warn('')
179  warn('number of events processed: {nEv}'.format(nEv=iEv+1))
180 
181  def process(self, iEv ):
182  """Run event processing for all analyzers in the sequence.
183 
184  This function is called by self.loop,
185  but can also be called directly from
186  the python interpreter, to jump to a given event.
187  """
188  self.event = Event(iEv, self.events[iEv], self.setup)
189  self.iEvent = iEv
190  for i,analyzer in enumerate(self.analyzers):
191  if not analyzer.beginLoopCalled:
192  analyzer.beginLoop()
193  start = timeit.default_timer()
194  ret = analyzer.process( self.event )
195  if self.timeReport:
196  self.timeReport[i]['events'] += 1
197  if self.timeReport[i]['events'] > 0:
198  self.timeReport[i]['time'] += timeit.default_timer() - start
199  if ret == False:
200  return (False, analyzer.name)
201  return (True, analyzer.name)
202 
203  def write(self):
204  """Writes all analyzers.
205 
206  See Analyzer.Write for more information.
207  """
208  for analyzer in self.analyzers:
209  analyzer.write(self.setup)
210  self.setup.close()
211 
212  if self.timeReport:
213  allev = max([x['events'] for x in self.timeReport])
214  print "\n ---- TimeReport (all times in ms; first evt is skipped) ---- "
215  print "%9s %9s %9s %9s %s" % ("processed","all evts","time/proc", " time/all", "analyer")
216  print "%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------")
217  for ana,rep in zip(self.analyzers,self.timeReport):
218  print "%9d %9d %10.2f %10.2f %s" % ( rep['events'], allev, 1000*rep['time']/(rep['events']-1) if rep['events']>1 else 0, 1000*rep['time']/(allev-1) if allev > 1 else 0, ana.name)
219  print ""
220  pass
221 
222 
223 if __name__ == '__main__':
224 
225  import pickle
226  import sys
227  import os
228  if len(sys.argv) == 2 :
229  cfgFileName = sys.argv[1]
230  pckfile = open( cfgFileName, 'r' )
231  config = pickle.load( pckfile )
232  comp = config.components[0]
233  events_class = config.events_class
234  elif len(sys.argv) == 3 :
235  cfgFileName = sys.argv[1]
236  file = open( cfgFileName, 'r' )
237  cfg = imp.load_source( 'cfg', cfgFileName, file)
238  compFileName = sys.argv[2]
239  pckfile = open( compFileName, 'r' )
240  comp = pickle.load( pckfile )
241  cfg.config.components=[comp]
242  events_class = cfg.config.events_class
243 
244  looper = Looper( 'Loop', cfg.config,nPrint = 5)
245  looper.loop()
246  looper.write()
def _prepareOutput
Definition: looper.py:115
def close
Definition: looper.py:43
tuple zip
Definition: archive.py:476
def __init__
Definition: looper.py:55
def __init__
Definition: looper.py:25
def _build
Definition: looper.py:110
T min(T a, T b)
Definition: MathUtil.h:58
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
list object
Definition: dbtoconf.py:77
def process
Definition: looper.py:181
def warn
Definition: __init__.py:20
if(conf.exists("allCellsPositionCalc"))