CMS 3D CMS Logo

looper.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 # Copyright (C) 2014 Colin Bernet
4 # https://github.com/cbernet/heppy/blob/master/LICENSE
5 
6 from builtins import range
7 import os
8 import sys
9 import imp
10 import logging
11 import pprint
12 from math import ceil
13 from .event import Event
14 import timeit
15 import resource
16 import json
17 import six
18 
19 class Setup(object):
20  '''The Looper creates a Setup object to hold information relevant during
21  the whole process, such as the process configuration obtained from
22  the configuration file, or services that can be used by several analyzers.
23 
24  The user may freely attach new information to the setup object,
25  as long as this information is relevant during the whole process.
26  If the information is event specific, it should be attached to the event
27  object instead.
28  '''
29  def __init__(self, config, services):
30  '''
31  Create a Setup object.
32 
33  parameters:
34 
35  config: configuration object from the configuration file
36 
37  services: dictionary of services indexed by service name.
38  The service name has the form classObject_instanceLabel
39  as in this example:
40  <base_heppy_path>.framework.services.tfile.TFileService_myhists
41  To find out about the service name of a given service,
42  load your configuration file in python, and print the service.
43  '''
44  self.config = config
45  self.services = services
46 
47  def close(self):
48  '''Stop all services'''
49  for service in self.services.values():
50  service.stop()
51 
52 
53 class Looper(object):
54  """Creates a set of analyzers, and schedules the event processing."""
55 
56  def __init__( self, name,
57  config,
58  nEvents=None,
59  firstEvent=0,
60  nPrint=0,
61  timeReport=False,
62  quiet=False,
63  memCheckFromEvent=-1):
64  """Handles the processing of an event sample.
65  An Analyzer is built for each Config.Analyzer present
66  in sequence. The Looper can then be used to process an event,
67  or a collection of events.
68 
69  Parameters:
70  name : name of the Looper, will be used as the output directory name
71  config : process configuration information, see Config
72  nEvents : number of events to process. Defaults to all.
73  firstEvent : first event to process. Defaults to the first one.
74  nPrint : number of events to print at the beginning
75  """
76 
77  self.config = config
78  self.name = self._prepareOutput(name)
79  self.outDir = self.name
80  self.logger = logging.getLogger( self.name )
81  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
82  'log.txt'])))
83  self.logger.propagate = False
84  if not quiet:
85  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
86 
87  self.cfg_comp = config.components[0]
88  self.classes = {}
89  self.analyzers = map( self._build, config.sequence )
90  self.nEvents = nEvents
91  self.firstEvent = firstEvent
92  self.nPrint = int(nPrint)
93  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
94  self.memReportFirstEvent = memCheckFromEvent
95  self.memLast=0
96  tree_name = None
97  if( hasattr(self.cfg_comp, 'tree_name') ):
98  tree_name = self.cfg_comp.tree_name
99  if len(self.cfg_comp.files)==0:
100  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
101  raise ValueError( errmsg )
102  if hasattr(config,"preprocessor") and config.preprocessor is not None :
103  self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
104  if hasattr(self.cfg_comp,"options"):
105  print(self.cfg_comp.files,self.cfg_comp.options)
106  self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
107  else :
108  self.events = config.events_class(self.cfg_comp.files, tree_name)
109  if hasattr(self.cfg_comp, 'fineSplit'):
110  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
111  if fineSplitFactor > 1:
112  if len(self.cfg_comp.files) != 1:
113  raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
114  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
115  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
116  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
117  if self.firstEvent + self.nEvents >= totevents:
118  self.nEvents = totevents - self.firstEvent
119  #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
120  # self.event is set in self.process
121  self.event = None
122  services = dict()
123  for cfg_serv in config.services:
124  service = self._build(cfg_serv)
125  services[cfg_serv.name] = service
126  # would like to provide a copy of the config to the setup,
127  # so that analyzers cannot modify the config of other analyzers.
128  # but cannot copy the autofill config.
129  self.setup = Setup(config, services)
130 
131  def _build(self, cfg):
132  theClass = cfg.class_object
133  obj = theClass( cfg, self.cfg_comp, self.outDir )
134  return obj
135 
136  def _prepareOutput(self, name):
137  index = 0
138  tmpname = name
139  while True and index < 2000:
140  try:
141  # print 'mkdir', self.name
142  os.mkdir( tmpname )
143  break
144  except OSError:
145  index += 1
146  tmpname = '%s_%d' % (name, index)
147  if index == 2000:
148  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
149  return tmpname
150 
151 
152  def loop(self):
153  """Loop on a given number of events.
154 
155  At the beginning of the loop,
156  Analyzer.beginLoop is called for each Analyzer.
157  At each event, self.process is called.
158  At the end of the loop, Analyzer.endLoop is called.
159  """
160  nEvents = self.nEvents
161  firstEvent = self.firstEvent
162  iEv = firstEvent
163  if nEvents is None or int(nEvents) > len(self.events) :
164  nEvents = len(self.events)
165  else:
166  nEvents = int(nEvents)
167  eventSize = nEvents
168  self.logger.info(
169  'starting loop at event {firstEvent} '\
170  'to process {eventSize} events.'.format(firstEvent=firstEvent,
171  eventSize=eventSize))
172  self.logger.info( str( self.cfg_comp ) )
173  for analyzer in self.analyzers:
174  analyzer.beginLoop(self.setup)
175  try:
176  for iEv in range(firstEvent, firstEvent+eventSize):
177  # if iEv == nEvents:
178  # break
179  if iEv%100 ==0:
180  # print 'event', iEv
181  if not hasattr(self,'start_time'):
182  print('event', iEv)
183  self.start_time = timeit.default_timer()
184  self.start_time_event = iEv
185  else:
186  print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
187 
188  self.process( iEv )
189  if iEv<self.nPrint:
190  print(self.event)
191 
192  except UserWarning:
193  print('Stopped loop following a UserWarning exception')
194 
195  info = self.logger.info
196  warning = self.logger.warning
197  warning('number of events processed: {nEv}'.format(nEv=iEv+1))
198  warning('')
199  info( self.cfg_comp )
200  info('')
201  for analyzer in self.analyzers:
202  analyzer.endLoop(self.setup)
203  if self.timeReport:
204  allev = max([x['events'] for x in self.timeReport])
205  warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
206  warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
207  warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
208  sumtime = sum(rep['time'] for rep in self.timeReport)
209  passev = self.timeReport[-1]['events']
210  for ana,rep in zip(self.analyzers,self.timeReport):
211  timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
212  timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
213  fracAllEv = rep['time']/sumtime
214  warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
215  totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
216  totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
217  warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
218  warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
219  warning("")
220  if hasattr(self.events, 'endLoop'): self.events.endLoop()
221  if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
222  if hasattr(self.config.preprocessor,"endLoop"):
223  self.config.preprocessor.endLoop(self.cfg_comp)
224 
225  def process(self, iEv ):
226  """Run event processing for all analyzers in the sequence.
227 
228  This function is called by self.loop,
229  but can also be called directly from
230  the python interpreter, to jump to a given event.
231  """
232  self.event = Event(iEv, self.events[iEv], self.setup)
233  self.iEvent = iEv
234  for i,analyzer in enumerate(self.analyzers):
235  if not analyzer.beginLoopCalled:
236  analyzer.beginLoop(self.setup)
237  start = timeit.default_timer()
238  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
239  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
240  if memNow > self.memLast :
241  print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
242  self.memLast=memNow
243  ret = analyzer.process( self.event )
244  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
245  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
246  if memNow > self.memLast :
247  print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
248  self.memLast=memNow
249  if self.timeReport:
250  self.timeReport[i]['events'] += 1
251  if self.timeReport[i]['events'] > 0:
252  self.timeReport[i]['time'] += timeit.default_timer() - start
253  if ret == False:
254  return (False, analyzer.name)
255  if iEv<self.nPrint:
256  self.logger.info( self.event.__str__() )
257  return (True, analyzer.name)
258 
259  def write(self):
260  """Writes all analyzers.
261 
262  See Analyzer.Write for more information.
263  """
264  for analyzer in self.analyzers:
265  analyzer.write(self.setup)
266  self.setup.close()
267 
268 
269 if __name__ == '__main__':
270 
271  import pickle
272  import sys
273  import os
274  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
275  from optparse import OptionParser
276  parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
277  parser.add_option('--options',dest='options',default='',help='options json file')
278  (options,args) = parser.parse_args()
279 
280  if options.options!='':
281  jsonfilename = options.options
282  jfile = open (jsonfilename, 'r')
283  opts=json.loads(jfile.readline())
284  for k,v in six.iteritems(opts):
285  _heppyGlobalOptions[k]=v
286  jfile.close()
287 
288  if len(args) == 1 :
289  cfgFileName = args[0]
290  pckfile = open( cfgFileName, 'r' )
291  config = pickle.load( pckfile )
292  comp = config.components[0]
293  events_class = config.events_class
294  elif len(args) == 2 :
295  cfgFileName = args[0]
296  file = open( cfgFileName, 'r' )
297  cfg = imp.load_source( 'cfg', cfgFileName, file)
298  compFileName = args[1]
299  pckfile = open( compFileName, 'r' )
300  comp = pickle.load( pckfile )
301  cfg.config.components=[comp]
302  events_class = cfg.config.events_class
303 
304  looper = Looper( 'Loop', cfg.config,nPrint = 5)
305  looper.loop()
306  looper.write()
307 
def loop(self)
Definition: looper.py:152
static const TGPicture * info(bool iBackgroundIsBlack)
memReportFirstEvent
Definition: looper.py:94
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
Definition: looper.py:63
def close(self)
Definition: looper.py:47
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def write(self)
Definition: looper.py:259
T min(T a, T b)
Definition: MathUtil.h:58
def _build(self, cfg)
Definition: looper.py:131
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def process(self, iEv)
Definition: looper.py:225
def _prepareOutput(self, name)
Definition: looper.py:136
def __init__(self, config, services)
Definition: looper.py:29
#define str(s)