CMS 3D CMS Logo

looper.py
Go to the documentation of this file.
1 from __future__ import print_function
2 # Copyright (C) 2014 Colin Bernet
3 # https://github.com/cbernet/heppy/blob/master/LICENSE
4 
5 import os
6 import sys
7 import imp
8 import logging
9 import pprint
10 from math import ceil
11 from event import Event
12 import timeit
13 import resource
14 import json
15 import six
16 
17 class Setup(object):
18  '''The Looper creates a Setup object to hold information relevant during
19  the whole process, such as the process configuration obtained from
20  the configuration file, or services that can be used by several analyzers.
21 
22  The user may freely attach new information to the setup object,
23  as long as this information is relevant during the whole process.
24  If the information is event specific, it should be attached to the event
25  object instead.
26  '''
27  def __init__(self, config, services):
28  '''
29  Create a Setup object.
30 
31  parameters:
32 
33  config: configuration object from the configuration file
34 
35  services: dictionary of services indexed by service name.
36  The service name has the form classObject_instanceLabel
37  as in this example:
38  <base_heppy_path>.framework.services.tfile.TFileService_myhists
39  To find out about the service name of a given service,
40  load your configuration file in python, and print the service.
41  '''
42  self.config = config
43  self.services = services
44 
45  def close(self):
46  '''Stop all services'''
47  for service in self.services.values():
48  service.stop()
49 
50 
51 class Looper(object):
52  """Creates a set of analyzers, and schedules the event processing."""
53 
54  def __init__( self, name,
55  config,
56  nEvents=None,
57  firstEvent=0,
58  nPrint=0,
59  timeReport=False,
60  quiet=False,
61  memCheckFromEvent=-1):
62  """Handles the processing of an event sample.
63  An Analyzer is built for each Config.Analyzer present
64  in sequence. The Looper can then be used to process an event,
65  or a collection of events.
66 
67  Parameters:
68  name : name of the Looper, will be used as the output directory name
69  config : process configuration information, see Config
70  nEvents : number of events to process. Defaults to all.
71  firstEvent : first event to process. Defaults to the first one.
72  nPrint : number of events to print at the beginning
73  """
74 
75  self.config = config
76  self.name = self._prepareOutput(name)
77  self.outDir = self.name
78  self.logger = logging.getLogger( self.name )
79  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
80  'log.txt'])))
81  self.logger.propagate = False
82  if not quiet:
83  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
84 
85  self.cfg_comp = config.components[0]
86  self.classes = {}
87  self.analyzers = map( self._build, config.sequence )
88  self.nEvents = nEvents
89  self.firstEvent = firstEvent
90  self.nPrint = int(nPrint)
91  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
92  self.memReportFirstEvent = memCheckFromEvent
93  self.memLast=0
94  tree_name = None
95  if( hasattr(self.cfg_comp, 'tree_name') ):
96  tree_name = self.cfg_comp.tree_name
97  if len(self.cfg_comp.files)==0:
98  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
99  raise ValueError( errmsg )
100  if hasattr(config,"preprocessor") and config.preprocessor is not None :
101  self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
102  if hasattr(self.cfg_comp,"options"):
103  print(self.cfg_comp.files,self.cfg_comp.options)
104  self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
105  else :
106  self.events = config.events_class(self.cfg_comp.files, tree_name)
107  if hasattr(self.cfg_comp, 'fineSplit'):
108  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
109  if fineSplitFactor > 1:
110  if len(self.cfg_comp.files) != 1:
111  raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
112  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
113  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
114  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
115  if self.firstEvent + self.nEvents >= totevents:
116  self.nEvents = totevents - self.firstEvent
117  #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
118  # self.event is set in self.process
119  self.event = None
120  services = dict()
121  for cfg_serv in config.services:
122  service = self._build(cfg_serv)
123  services[cfg_serv.name] = service
124  # would like to provide a copy of the config to the setup,
125  # so that analyzers cannot modify the config of other analyzers.
126  # but cannot copy the autofill config.
127  self.setup = Setup(config, services)
128 
129  def _build(self, cfg):
130  theClass = cfg.class_object
131  obj = theClass( cfg, self.cfg_comp, self.outDir )
132  return obj
133 
134  def _prepareOutput(self, name):
135  index = 0
136  tmpname = name
137  while True and index < 2000:
138  try:
139  # print 'mkdir', self.name
140  os.mkdir( tmpname )
141  break
142  except OSError:
143  index += 1
144  tmpname = '%s_%d' % (name, index)
145  if index == 2000:
146  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
147  return tmpname
148 
149 
150  def loop(self):
151  """Loop on a given number of events.
152 
153  At the beginning of the loop,
154  Analyzer.beginLoop is called for each Analyzer.
155  At each event, self.process is called.
156  At the end of the loop, Analyzer.endLoop is called.
157  """
158  nEvents = self.nEvents
159  firstEvent = self.firstEvent
160  iEv = firstEvent
161  if nEvents is None or int(nEvents) > len(self.events) :
162  nEvents = len(self.events)
163  else:
164  nEvents = int(nEvents)
165  eventSize = nEvents
166  self.logger.info(
167  'starting loop at event {firstEvent} '\
168  'to process {eventSize} events.'.format(firstEvent=firstEvent,
169  eventSize=eventSize))
170  self.logger.info( str( self.cfg_comp ) )
171  for analyzer in self.analyzers:
172  analyzer.beginLoop(self.setup)
173  try:
174  for iEv in range(firstEvent, firstEvent+eventSize):
175  # if iEv == nEvents:
176  # break
177  if iEv%100 ==0:
178  # print 'event', iEv
179  if not hasattr(self,'start_time'):
180  print('event', iEv)
181  self.start_time = timeit.default_timer()
182  self.start_time_event = iEv
183  else:
184  print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
185 
186  self.process( iEv )
187  if iEv<self.nPrint:
188  print(self.event)
189 
190  except UserWarning:
191  print('Stopped loop following a UserWarning exception')
192 
193  info = self.logger.info
194  warning = self.logger.warning
195  warning('number of events processed: {nEv}'.format(nEv=iEv+1))
196  warning('')
197  info( self.cfg_comp )
198  info('')
199  for analyzer in self.analyzers:
200  analyzer.endLoop(self.setup)
201  if self.timeReport:
202  allev = max([x['events'] for x in self.timeReport])
203  warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
204  warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
205  warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
206  sumtime = sum(rep['time'] for rep in self.timeReport)
207  passev = self.timeReport[-1]['events']
208  for ana,rep in zip(self.analyzers,self.timeReport):
209  timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
210  timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
211  fracAllEv = rep['time']/sumtime
212  warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
213  totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
214  totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
215  warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
216  warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
217  warning("")
218  if hasattr(self.events, 'endLoop'): self.events.endLoop()
219  if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
220  if hasattr(self.config.preprocessor,"endLoop"):
221  self.config.preprocessor.endLoop(self.cfg_comp)
222 
223  def process(self, iEv ):
224  """Run event processing for all analyzers in the sequence.
225 
226  This function is called by self.loop,
227  but can also be called directly from
228  the python interpreter, to jump to a given event.
229  """
230  self.event = Event(iEv, self.events[iEv], self.setup)
231  self.iEvent = iEv
232  for i,analyzer in enumerate(self.analyzers):
233  if not analyzer.beginLoopCalled:
234  analyzer.beginLoop(self.setup)
235  start = timeit.default_timer()
236  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
237  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
238  if memNow > self.memLast :
239  print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
240  self.memLast=memNow
241  ret = analyzer.process( self.event )
242  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
243  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
244  if memNow > self.memLast :
245  print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
246  self.memLast=memNow
247  if self.timeReport:
248  self.timeReport[i]['events'] += 1
249  if self.timeReport[i]['events'] > 0:
250  self.timeReport[i]['time'] += timeit.default_timer() - start
251  if ret == False:
252  return (False, analyzer.name)
253  if iEv<self.nPrint:
254  self.logger.info( self.event.__str__() )
255  return (True, analyzer.name)
256 
257  def write(self):
258  """Writes all analyzers.
259 
260  See Analyzer.Write for more information.
261  """
262  for analyzer in self.analyzers:
263  analyzer.write(self.setup)
264  self.setup.close()
265 
266 
267 if __name__ == '__main__':
268 
269  import pickle
270  import sys
271  import os
272  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
273  from optparse import OptionParser
274  parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
275  parser.add_option('--options',dest='options',default='',help='options json file')
276  (options,args) = parser.parse_args()
277 
278  if options.options!='':
279  jsonfilename = options.options
280  jfile = open (jsonfilename, 'r')
281  opts=json.loads(jfile.readline())
282  for k,v in six.iteritems(opts):
283  _heppyGlobalOptions[k]=v
284  jfile.close()
285 
286  if len(args) == 1 :
287  cfgFileName = args[0]
288  pckfile = open( cfgFileName, 'r' )
289  config = pickle.load( pckfile )
290  comp = config.components[0]
291  events_class = config.events_class
292  elif len(args) == 2 :
293  cfgFileName = args[0]
294  file = open( cfgFileName, 'r' )
295  cfg = imp.load_source( 'cfg', cfgFileName, file)
296  compFileName = args[1]
297  pckfile = open( compFileName, 'r' )
298  comp = pickle.load( pckfile )
299  cfg.config.components=[comp]
300  events_class = cfg.config.events_class
301 
302  looper = Looper( 'Loop', cfg.config,nPrint = 5)
303  looper.loop()
304  looper.write()
305 
def loop(self)
Definition: looper.py:150
static const TGPicture * info(bool iBackgroundIsBlack)
memReportFirstEvent
Definition: looper.py:92
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
Definition: looper.py:61
def close(self)
Definition: looper.py:45
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def write(self)
Definition: looper.py:257
T min(T a, T b)
Definition: MathUtil.h:58
def _build(self, cfg)
Definition: looper.py:129
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def process(self, iEv)
Definition: looper.py:223
def _prepareOutput(self, name)
Definition: looper.py:134
def __init__(self, config, services)
Definition: looper.py:27
#define str(s)