CMS 3D CMS Logo

looper.py
Go to the documentation of this file.
1 # Copyright (C) 2014 Colin Bernet
2 # https://github.com/cbernet/heppy/blob/master/LICENSE
3 
4 import os
5 import sys
6 import imp
7 import logging
8 import pprint
9 from math import ceil
10 from event import Event
11 import timeit
12 import resource
13 import json
14 import six
15 
16 class Setup(object):
17  '''The Looper creates a Setup object to hold information relevant during
18  the whole process, such as the process configuration obtained from
19  the configuration file, or services that can be used by several analyzers.
20 
21  The user may freely attach new information to the setup object,
22  as long as this information is relevant during the whole process.
23  If the information is event specific, it should be attached to the event
24  object instead.
25  '''
26  def __init__(self, config, services):
27  '''
28  Create a Setup object.
29 
30  parameters:
31 
32  config: configuration object from the configuration file
33 
34  services: dictionary of services indexed by service name.
35  The service name has the form classObject_instanceLabel
36  as in this example:
37  <base_heppy_path>.framework.services.tfile.TFileService_myhists
38  To find out about the service name of a given service,
39  load your configuration file in python, and print the service.
40  '''
41  self.config = config
42  self.services = services
43 
44  def close(self):
45  '''Stop all services'''
46  for service in self.services.values():
47  service.stop()
48 
49 
50 class Looper(object):
51  """Creates a set of analyzers, and schedules the event processing."""
52 
53  def __init__( self, name,
54  config,
55  nEvents=None,
56  firstEvent=0,
57  nPrint=0,
58  timeReport=False,
59  quiet=False,
60  memCheckFromEvent=-1):
61  """Handles the processing of an event sample.
62  An Analyzer is built for each Config.Analyzer present
63  in sequence. The Looper can then be used to process an event,
64  or a collection of events.
65 
66  Parameters:
67  name : name of the Looper, will be used as the output directory name
68  config : process configuration information, see Config
69  nEvents : number of events to process. Defaults to all.
70  firstEvent : first event to process. Defaults to the first one.
71  nPrint : number of events to print at the beginning
72  """
73 
74  self.config = config
75  self.name = self._prepareOutput(name)
76  self.outDir = self.name
77  self.logger = logging.getLogger( self.name )
78  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
79  'log.txt'])))
80  self.logger.propagate = False
81  if not quiet:
82  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
83 
84  self.cfg_comp = config.components[0]
85  self.classes = {}
86  self.analyzers = map( self._build, config.sequence )
87  self.nEvents = nEvents
88  self.firstEvent = firstEvent
89  self.nPrint = int(nPrint)
90  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
91  self.memReportFirstEvent = memCheckFromEvent
92  self.memLast=0
93  tree_name = None
94  if( hasattr(self.cfg_comp, 'tree_name') ):
95  tree_name = self.cfg_comp.tree_name
96  if len(self.cfg_comp.files)==0:
97  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
98  raise ValueError( errmsg )
99  if hasattr(config,"preprocessor") and config.preprocessor is not None :
100  self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
101  if hasattr(self.cfg_comp,"options"):
102  print self.cfg_comp.files,self.cfg_comp.options
103  self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
104  else :
105  self.events = config.events_class(self.cfg_comp.files, tree_name)
106  if hasattr(self.cfg_comp, 'fineSplit'):
107  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
108  if fineSplitFactor > 1:
109  if len(self.cfg_comp.files) != 1:
110  raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
111  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
112  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
113  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
114  if self.firstEvent + self.nEvents >= totevents:
115  self.nEvents = totevents - self.firstEvent
116  #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
117  # self.event is set in self.process
118  self.event = None
119  services = dict()
120  for cfg_serv in config.services:
121  service = self._build(cfg_serv)
122  services[cfg_serv.name] = service
123  # would like to provide a copy of the config to the setup,
124  # so that analyzers cannot modify the config of other analyzers.
125  # but cannot copy the autofill config.
126  self.setup = Setup(config, services)
127 
128  def _build(self, cfg):
129  theClass = cfg.class_object
130  obj = theClass( cfg, self.cfg_comp, self.outDir )
131  return obj
132 
133  def _prepareOutput(self, name):
134  index = 0
135  tmpname = name
136  while True and index < 2000:
137  try:
138  # print 'mkdir', self.name
139  os.mkdir( tmpname )
140  break
141  except OSError:
142  index += 1
143  tmpname = '%s_%d' % (name, index)
144  if index == 2000:
145  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
146  return tmpname
147 
148 
149  def loop(self):
150  """Loop on a given number of events.
151 
152  At the beginning of the loop,
153  Analyzer.beginLoop is called for each Analyzer.
154  At each event, self.process is called.
155  At the end of the loop, Analyzer.endLoop is called.
156  """
157  nEvents = self.nEvents
158  firstEvent = self.firstEvent
159  iEv = firstEvent
160  if nEvents is None or int(nEvents) > len(self.events) :
161  nEvents = len(self.events)
162  else:
163  nEvents = int(nEvents)
164  eventSize = nEvents
165  self.logger.info(
166  'starting loop at event {firstEvent} '\
167  'to process {eventSize} events.'.format(firstEvent=firstEvent,
168  eventSize=eventSize))
169  self.logger.info( str( self.cfg_comp ) )
170  for analyzer in self.analyzers:
171  analyzer.beginLoop(self.setup)
172  try:
173  for iEv in range(firstEvent, firstEvent+eventSize):
174  # if iEv == nEvents:
175  # break
176  if iEv%100 ==0:
177  # print 'event', iEv
178  if not hasattr(self,'start_time'):
179  print 'event', iEv
180  self.start_time = timeit.default_timer()
181  self.start_time_event = iEv
182  else:
183  print 'event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time))
184 
185  self.process( iEv )
186  if iEv<self.nPrint:
187  print self.event
188 
189  except UserWarning:
190  print 'Stopped loop following a UserWarning exception'
191 
192  info = self.logger.info
193  warning = self.logger.warning
194  warning('number of events processed: {nEv}'.format(nEv=iEv+1))
195  warning('')
196  info( self.cfg_comp )
197  info('')
198  for analyzer in self.analyzers:
199  analyzer.endLoop(self.setup)
200  if self.timeReport:
201  allev = max([x['events'] for x in self.timeReport])
202  warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
203  warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
204  warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
205  sumtime = sum(rep['time'] for rep in self.timeReport)
206  passev = self.timeReport[-1]['events']
207  for ana,rep in zip(self.analyzers,self.timeReport):
208  timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
209  timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
210  fracAllEv = rep['time']/sumtime
211  warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
212  totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
213  totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
214  warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
215  warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
216  warning("")
217  if hasattr(self.events, 'endLoop'): self.events.endLoop()
218  if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
219  if hasattr(self.config.preprocessor,"endLoop"):
220  self.config.preprocessor.endLoop(self.cfg_comp)
221 
222  def process(self, iEv ):
223  """Run event processing for all analyzers in the sequence.
224 
225  This function is called by self.loop,
226  but can also be called directly from
227  the python interpreter, to jump to a given event.
228  """
229  self.event = Event(iEv, self.events[iEv], self.setup)
230  self.iEvent = iEv
231  for i,analyzer in enumerate(self.analyzers):
232  if not analyzer.beginLoopCalled:
233  analyzer.beginLoop(self.setup)
234  start = timeit.default_timer()
235  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
236  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
237  if memNow > self.memLast :
238  print "Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast)
239  self.memLast=memNow
240  ret = analyzer.process( self.event )
241  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
242  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
243  if memNow > self.memLast :
244  print "Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast)
245  self.memLast=memNow
246  if self.timeReport:
247  self.timeReport[i]['events'] += 1
248  if self.timeReport[i]['events'] > 0:
249  self.timeReport[i]['time'] += timeit.default_timer() - start
250  if ret == False:
251  return (False, analyzer.name)
252  if iEv<self.nPrint:
253  self.logger.info( self.event.__str__() )
254  return (True, analyzer.name)
255 
256  def write(self):
257  """Writes all analyzers.
258 
259  See Analyzer.Write for more information.
260  """
261  for analyzer in self.analyzers:
262  analyzer.write(self.setup)
263  self.setup.close()
264 
265 
266 if __name__ == '__main__':
267 
268  import pickle
269  import sys
270  import os
271  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
272  from optparse import OptionParser
273  parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
274  parser.add_option('--options',dest='options',default='',help='options json file')
275  (options,args) = parser.parse_args()
276 
277  if options.options!='':
278  jsonfilename = options.options
279  jfile = open (jsonfilename, 'r')
280  opts=json.loads(jfile.readline())
281  for k,v in six.iteritems(opts):
282  _heppyGlobalOptions[k]=v
283  jfile.close()
284 
285  if len(args) == 1 :
286  cfgFileName = args[0]
287  pckfile = open( cfgFileName, 'r' )
288  config = pickle.load( pckfile )
289  comp = config.components[0]
290  events_class = config.events_class
291  elif len(args) == 2 :
292  cfgFileName = args[0]
293  file = open( cfgFileName, 'r' )
294  cfg = imp.load_source( 'cfg', cfgFileName, file)
295  compFileName = args[1]
296  pckfile = open( compFileName, 'r' )
297  comp = pickle.load( pckfile )
298  cfg.config.components=[comp]
299  events_class = cfg.config.events_class
300 
301  looper = Looper( 'Loop', cfg.config,nPrint = 5)
302  looper.loop()
303  looper.write()
304 
def loop(self)
Definition: looper.py:149
static const TGPicture * info(bool iBackgroundIsBlack)
memReportFirstEvent
Definition: looper.py:91
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
Definition: looper.py:60
def close(self)
Definition: looper.py:44
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def write(self)
Definition: looper.py:256
T min(T a, T b)
Definition: MathUtil.h:58
def _build(self, cfg)
Definition: looper.py:128
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def process(self, iEv)
Definition: looper.py:222
def _prepareOutput(self, name)
Definition: looper.py:133
def __init__(self, config, services)
Definition: looper.py:26
#define str(s)