CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
looper.py
Go to the documentation of this file.
1 # Copyright (C) 2014 Colin Bernet
2 # https://github.com/cbernet/heppy/blob/master/LICENSE
3 
4 import os
5 import sys
6 import imp
7 import logging
8 import pprint
9 from math import ceil
10 from event import Event
11 import timeit
12 import resource
13 import json
14 
15 class Setup(object):
16  '''The Looper creates a Setup object to hold information relevant during
17  the whole process, such as the process configuration obtained from
18  the configuration file, or services that can be used by several analyzers.
19 
20  The user may freely attach new information to the setup object,
21  as long as this information is relevant during the whole process.
22  If the information is event specific, it should be attached to the event
23  object instead.
24  '''
25  def __init__(self, config, services):
26  '''
27  Create a Setup object.
28 
29  parameters:
30 
31  config: configuration object from the configuration file
32 
33  services: dictionary of services indexed by service name.
34  The service name has the form classObject_instanceLabel
35  as in this example:
36  <base_heppy_path>.framework.services.tfile.TFileService_myhists
37  To find out about the service name of a given service,
38  load your configuration file in python, and print the service.
39  '''
40  self.config = config
41  self.services = services
42 
43  def close(self):
44  '''Stop all services'''
45  for service in self.services.values():
46  service.stop()
47 
48 
49 class Looper(object):
50  """Creates a set of analyzers, and schedules the event processing."""
51 
52  def __init__( self, name,
53  config,
54  nEvents=None,
55  firstEvent=0,
56  nPrint=0,
57  timeReport=False,
58  quiet=False,
59  memCheckFromEvent=-1):
60  """Handles the processing of an event sample.
61  An Analyzer is built for each Config.Analyzer present
62  in sequence. The Looper can then be used to process an event,
63  or a collection of events.
64 
65  Parameters:
66  name : name of the Looper, will be used as the output directory name
67  config : process configuration information, see Config
68  nEvents : number of events to process. Defaults to all.
69  firstEvent : first event to process. Defaults to the first one.
70  nPrint : number of events to print at the beginning
71  """
72 
73  self.config = config
74  self.name = self._prepareOutput(name)
75  self.outDir = self.name
76  self.logger = logging.getLogger( self.name )
77  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
78  'log.txt'])))
79  self.logger.propagate = False
80  if not quiet:
81  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
82 
83  self.cfg_comp = config.components[0]
84  self.classes = {}
85  self.analyzers = map( self._build, config.sequence )
86  self.nEvents = nEvents
87  self.firstEvent = firstEvent
88  self.nPrint = int(nPrint)
89  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
90  self.memReportFirstEvent = memCheckFromEvent
91  self.memLast=0
92  tree_name = None
93  if( hasattr(self.cfg_comp, 'tree_name') ):
94  tree_name = self.cfg_comp.tree_name
95  if len(self.cfg_comp.files)==0:
96  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
97  raise ValueError( errmsg )
98  if hasattr(config,"preprocessor") and config.preprocessor is not None :
99  self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
100  if hasattr(self.cfg_comp,"options"):
101  print self.cfg_comp.files,self.cfg_comp.options
102  self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
103  else :
104  self.events = config.events_class(self.cfg_comp.files, tree_name)
105  if hasattr(self.cfg_comp, 'fineSplit'):
106  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
107  if fineSplitFactor > 1:
108  if len(self.cfg_comp.files) != 1:
109  raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
110  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
111  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
112  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
113  if self.firstEvent + self.nEvents >= totevents:
114  self.nEvents = totevents - self.firstEvent
115  #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
116  # self.event is set in self.process
117  self.event = None
118  services = dict()
119  for cfg_serv in config.services:
120  service = self._build(cfg_serv)
121  services[cfg_serv.name] = service
122  # would like to provide a copy of the config to the setup,
123  # so that analyzers cannot modify the config of other analyzers.
124  # but cannot copy the autofill config.
125  self.setup = Setup(config, services)
126 
127  def _build(self, cfg):
128  theClass = cfg.class_object
129  obj = theClass( cfg, self.cfg_comp, self.outDir )
130  return obj
131 
132  def _prepareOutput(self, name):
133  index = 0
134  tmpname = name
135  while True and index < 2000:
136  try:
137  # print 'mkdir', self.name
138  os.mkdir( tmpname )
139  break
140  except OSError:
141  index += 1
142  tmpname = '%s_%d' % (name, index)
143  if index == 2000:
144  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
145  return tmpname
146 
147 
148  def loop(self):
149  """Loop on a given number of events.
150 
151  At the beginning of the loop,
152  Analyzer.beginLoop is called for each Analyzer.
153  At each event, self.process is called.
154  At the end of the loop, Analyzer.endLoop is called.
155  """
156  nEvents = self.nEvents
157  firstEvent = self.firstEvent
158  iEv = firstEvent
159  if nEvents is None or int(nEvents) > len(self.events) :
160  nEvents = len(self.events)
161  else:
162  nEvents = int(nEvents)
163  eventSize = nEvents
164  self.logger.info(
165  'starting loop at event {firstEvent} '\
166  'to process {eventSize} events.'.format(firstEvent=firstEvent,
167  eventSize=eventSize))
168  self.logger.info( str( self.cfg_comp ) )
169  for analyzer in self.analyzers:
170  analyzer.beginLoop(self.setup)
171  try:
172  for iEv in range(firstEvent, firstEvent+eventSize):
173  # if iEv == nEvents:
174  # break
175  if iEv%100 ==0:
176  # print 'event', iEv
177  if not hasattr(self,'start_time'):
178  print 'event', iEv
179  self.start_time = timeit.default_timer()
180  self.start_time_event = iEv
181  else:
182  print 'event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time))
183 
184  self.process( iEv )
185  if iEv<self.nPrint:
186  print self.event
187 
188  except UserWarning:
189  print 'Stopped loop following a UserWarning exception'
190 
191  info = self.logger.info
192  warning = self.logger.warning
193  warning('number of events processed: {nEv}'.format(nEv=iEv+1))
194  warning('')
195  info( self.cfg_comp )
196  info('')
197  for analyzer in self.analyzers:
198  analyzer.endLoop(self.setup)
199  if self.timeReport:
200  allev = max([x['events'] for x in self.timeReport])
201  warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
202  warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
203  warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
204  sumtime = sum(rep['time'] for rep in self.timeReport)
205  passev = self.timeReport[-1]['events']
206  for ana,rep in zip(self.analyzers,self.timeReport):
207  timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
208  timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
209  fracAllEv = rep['time']/sumtime
210  warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
211  totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
212  totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
213  warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
214  warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
215  warning("")
216  if hasattr(self.events, 'endLoop'): self.events.endLoop()
217  if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
218  if hasattr(self.config.preprocessor,"endLoop"):
219  self.config.preprocessor.endLoop(self.cfg_comp)
220 
221  def process(self, iEv ):
222  """Run event processing for all analyzers in the sequence.
223 
224  This function is called by self.loop,
225  but can also be called directly from
226  the python interpreter, to jump to a given event.
227  """
228  self.event = Event(iEv, self.events[iEv], self.setup)
229  self.iEvent = iEv
230  for i,analyzer in enumerate(self.analyzers):
231  if not analyzer.beginLoopCalled:
232  analyzer.beginLoop(self.setup)
233  start = timeit.default_timer()
234  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
235  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
236  if memNow > self.memLast :
237  print "Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast)
238  self.memLast=memNow
239  ret = analyzer.process( self.event )
240  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
241  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
242  if memNow > self.memLast :
243  print "Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast)
244  self.memLast=memNow
245  if self.timeReport:
246  self.timeReport[i]['events'] += 1
247  if self.timeReport[i]['events'] > 0:
248  self.timeReport[i]['time'] += timeit.default_timer() - start
249  if ret == False:
250  return (False, analyzer.name)
251  if iEv<self.nPrint:
252  self.logger.info( self.event.__str__() )
253  return (True, analyzer.name)
254 
255  def write(self):
256  """Writes all analyzers.
257 
258  See Analyzer.Write for more information.
259  """
260  for analyzer in self.analyzers:
261  analyzer.write(self.setup)
262  self.setup.close()
263 
264 
265 if __name__ == '__main__':
266 
267  import pickle
268  import sys
269  import os
270  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
271  from optparse import OptionParser
272  parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
273  parser.add_option('--options',dest='options',default='',help='options json file')
274  (options,args) = parser.parse_args()
275 
276  if options.options!='':
277  jsonfilename = options.options
278  jfile = open (jsonfilename, 'r')
279  opts=json.loads(jfile.readline())
280  for k,v in opts.iteritems():
281  _heppyGlobalOptions[k]=v
282  jfile.close()
283 
284  if len(args) == 1 :
285  cfgFileName = args[0]
286  pckfile = open( cfgFileName, 'r' )
287  config = pickle.load( pckfile )
288  comp = config.components[0]
289  events_class = config.events_class
290  elif len(args) == 2 :
291  cfgFileName = args[0]
292  file = open( cfgFileName, 'r' )
293  cfg = imp.load_source( 'cfg', cfgFileName, file)
294  compFileName = args[1]
295  pckfile = open( compFileName, 'r' )
296  comp = pickle.load( pckfile )
297  cfg.config.components=[comp]
298  events_class = cfg.config.events_class
299 
300  looper = Looper( 'Loop', cfg.config,nPrint = 5)
301  looper.loop()
302  looper.write()
303 
def _prepareOutput
Definition: looper.py:132
static const TGPicture * info(bool iBackgroundIsBlack)
def close
Definition: looper.py:43
memReportFirstEvent
Definition: looper.py:90
def __init__
Definition: looper.py:59
def __init__
Definition: looper.py:25
def _build
Definition: looper.py:127
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
T min(T a, T b)
Definition: MathUtil.h:58
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
if(dp >Float(M_PI)) dp-
def process
Definition: looper.py:221