CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
looper.py
Go to the documentation of this file.
1 # Copyright (C) 2014 Colin Bernet
2 # https://github.com/cbernet/heppy/blob/master/LICENSE
3 
4 import os
5 import sys
6 import imp
7 import logging
8 import pprint
9 from math import ceil
10 from event import Event
11 import timeit
12 
13 class Setup(object):
14  '''The Looper creates a Setup object to hold information relevant during
15  the whole process, such as the process configuration obtained from
16  the configuration file, or services that can be used by several analyzers.
17 
18  The user may freely attach new information to the setup object,
19  as long as this information is relevant during the whole process.
20  If the information is event specific, it should be attached to the event
21  object instead.
22  '''
23  def __init__(self, config, services):
24  '''
25  Create a Setup object.
26 
27  parameters:
28 
29  config: configuration object from the configuration file
30 
31  services: dictionary of services indexed by service name.
32  The service name has the form classObject_instanceLabel
33  as in this example:
34  <base_heppy_path>.framework.services.tfile.TFileService_myhists
35  To find out about the service name of a given service,
36  load your configuration file in python, and print the service.
37  '''
38  self.config = config
39  self.services = services
40 
41  def close(self):
42  '''Stop all services'''
43  for service in self.services.values():
44  service.stop()
45 
46 
47 class Looper(object):
48  """Creates a set of analyzers, and schedules the event processing."""
49 
50  def __init__( self, name,
51  config,
52  nEvents=None,
53  firstEvent=0,
54  nPrint=0,
55  timeReport=False,
56  quiet=False):
57  """Handles the processing of an event sample.
58  An Analyzer is built for each Config.Analyzer present
59  in sequence. The Looper can then be used to process an event,
60  or a collection of events.
61 
62  Parameters:
63  name : name of the Looper, will be used as the output directory name
64  config : process configuration information, see Config
65  nEvents : number of events to process. Defaults to all.
66  firstEvent : first event to process. Defaults to the first one.
67  nPrint : number of events to print at the beginning
68  """
69 
70  self.name = self._prepareOutput(name)
71  self.outDir = self.name
72  self.logger = logging.getLogger( self.name )
73  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
74  'log.txt'])))
75  self.logger.propagate = False
76  if not quiet:
77  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
78 
79  self.cfg_comp = config.components[0]
80  self.classes = {}
81  self.analyzers = map( self._build, config.sequence )
82  self.nEvents = nEvents
83  self.firstEvent = firstEvent
84  self.nPrint = int(nPrint)
85  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
86  tree_name = None
87  if( hasattr(self.cfg_comp, 'tree_name') ):
88  tree_name = self.cfg_comp.tree_name
89  if len(self.cfg_comp.files)==0:
90  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
91  raise ValueError( errmsg )
92  if hasattr(config,"preprocessor") and config.preprocessor is not None :
93  self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
94  if hasattr(self.cfg_comp,"options"):
95  print self.cfg_comp.files,self.cfg_comp.options
96  self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
97  else :
98  self.events = config.events_class(self.cfg_comp.files, tree_name)
99  if hasattr(self.cfg_comp, 'fineSplit'):
100  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
101  if fineSplitFactor > 1:
102  if len(self.cfg_comp.files) != 1:
103  raise RuntimeError, "Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files)
104  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
105  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
106  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
107  if self.firstEvent + self.nEvents >= totevents:
108  self.nEvents = totevents - self.firstEvent
109  #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
110  # self.event is set in self.process
111  self.event = None
112  services = dict()
113  for cfg_serv in config.services:
114  service = self._build(cfg_serv)
115  services[cfg_serv.name] = service
116  # would like to provide a copy of the config to the setup,
117  # so that analyzers cannot modify the config of other analyzers.
118  # but cannot copy the autofill config.
119  self.setup = Setup(config, services)
120 
121  def _build(self, cfg):
122  theClass = cfg.class_object
123  obj = theClass( cfg, self.cfg_comp, self.outDir )
124  return obj
125 
126  def _prepareOutput(self, name):
127  index = 0
128  tmpname = name
129  while True and index < 2000:
130  try:
131  # print 'mkdir', self.name
132  os.mkdir( tmpname )
133  break
134  except OSError:
135  index += 1
136  tmpname = '%s_%d' % (name, index)
137  if index == 2000:
138  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
139  return tmpname
140 
141 
142  def loop(self):
143  """Loop on a given number of events.
144 
145  At the beginning of the loop,
146  Analyzer.beginLoop is called for each Analyzer.
147  At each event, self.process is called.
148  At the end of the loop, Analyzer.endLoop is called.
149  """
150  nEvents = self.nEvents
151  firstEvent = self.firstEvent
152  iEv = firstEvent
153  if nEvents is None or int(nEvents) > len(self.events) :
154  nEvents = len(self.events)
155  else:
156  nEvents = int(nEvents)
157  eventSize = nEvents
158  self.logger.info(
159  'starting loop at event {firstEvent} '\
160  'to process {eventSize} events.'.format(firstEvent=firstEvent,
161  eventSize=eventSize))
162  self.logger.info( str( self.cfg_comp ) )
163  for analyzer in self.analyzers:
164  analyzer.beginLoop(self.setup)
165  try:
166  for iEv in range(firstEvent, firstEvent+eventSize):
167  # if iEv == nEvents:
168  # break
169  if iEv%100 ==0:
170  # print 'event', iEv
171  if not hasattr(self,'start_time'):
172  print 'event', iEv
173  self.start_time = timeit.default_timer()
174  self.start_time_event = iEv
175  else:
176  print 'event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time))
177 
178  self.process( iEv )
179  if iEv<self.nPrint:
180  print self.event
181 
182  except UserWarning:
183  print 'Stopped loop following a UserWarning exception'
184 
185  info = self.logger.info
186  info('number of events processed: {nEv}'.format(nEv=iEv+1))
187  info('')
188  info( self.cfg_comp )
189  info('')
190  for analyzer in self.analyzers:
191  analyzer.endLoop(self.setup)
192  if self.timeReport:
193  allev = max([x['events'] for x in self.timeReport])
194  warning = self.logger.warning
195  warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
196  warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
197  warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
198  sumtime = sum(rep['time'] for rep in self.timeReport)
199  passev = self.timeReport[-1]['events']
200  for ana,rep in zip(self.analyzers,self.timeReport):
201  timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
202  timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
203  fracAllEv = rep['time']/sumtime
204  warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
205  totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
206  totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
207  warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
208  warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
209  warning("")
210 
211  def process(self, iEv ):
212  """Run event processing for all analyzers in the sequence.
213 
214  This function is called by self.loop,
215  but can also be called directly from
216  the python interpreter, to jump to a given event.
217  """
218  self.event = Event(iEv, self.events[iEv], self.setup)
219  self.iEvent = iEv
220  for i,analyzer in enumerate(self.analyzers):
221  if not analyzer.beginLoopCalled:
222  analyzer.beginLoop(self.setup)
223  start = timeit.default_timer()
224  ret = analyzer.process( self.event )
225  if self.timeReport:
226  self.timeReport[i]['events'] += 1
227  if self.timeReport[i]['events'] > 0:
228  self.timeReport[i]['time'] += timeit.default_timer() - start
229  if ret == False:
230  return (False, analyzer.name)
231  if iEv<self.nPrint:
232  self.logger.info( self.event.__str__() )
233  return (True, analyzer.name)
234 
235  def write(self):
236  """Writes all analyzers.
237 
238  See Analyzer.Write for more information.
239  """
240  for analyzer in self.analyzers:
241  analyzer.write(self.setup)
242  self.setup.close()
243 
244 
245 if __name__ == '__main__':
246 
247  import pickle
248  import sys
249  import os
250  if len(sys.argv) == 2 :
251  cfgFileName = sys.argv[1]
252  pckfile = open( cfgFileName, 'r' )
253  config = pickle.load( pckfile )
254  comp = config.components[0]
255  events_class = config.events_class
256  elif len(sys.argv) == 3 :
257  cfgFileName = sys.argv[1]
258  file = open( cfgFileName, 'r' )
259  cfg = imp.load_source( 'cfg', cfgFileName, file)
260  compFileName = sys.argv[2]
261  pckfile = open( compFileName, 'r' )
262  comp = pickle.load( pckfile )
263  cfg.config.components=[comp]
264  events_class = cfg.config.events_class
265 
266  looper = Looper( 'Loop', cfg.config,nPrint = 5)
267  looper.loop()
268  looper.write()
def _prepareOutput
Definition: looper.py:126
static const TGPicture * info(bool iBackgroundIsBlack)
def close
Definition: looper.py:41
def __init__
Definition: looper.py:56
def __init__
Definition: looper.py:23
def _build
Definition: looper.py:121
T min(T a, T b)
Definition: MathUtil.h:58
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
list object
Definition: dbtoconf.py:77
def process
Definition: looper.py:211
if(conf.exists("allCellsPositionCalc"))