CMS 3D CMS Logo

looper.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 # Copyright (C) 2014 Colin Bernet
4 # https://github.com/cbernet/heppy/blob/master/LICENSE
5 
6 from builtins import range
7 import os
8 import sys
9 import imp
10 import logging
11 import pprint
12 from math import ceil
13 from .event import Event
14 import timeit
15 import resource
16 import json
17 
18 class Setup(object):
19  '''The Looper creates a Setup object to hold information relevant during
20  the whole process, such as the process configuration obtained from
21  the configuration file, or services that can be used by several analyzers.
22 
23  The user may freely attach new information to the setup object,
24  as long as this information is relevant during the whole process.
25  If the information is event specific, it should be attached to the event
26  object instead.
27  '''
28  def __init__(self, config, services):
29  '''
30  Create a Setup object.
31 
32  parameters:
33 
34  config: configuration object from the configuration file
35 
36  services: dictionary of services indexed by service name.
37  The service name has the form classObject_instanceLabel
38  as in this example:
39  <base_heppy_path>.framework.services.tfile.TFileService_myhists
40  To find out about the service name of a given service,
41  load your configuration file in python, and print the service.
42  '''
43  self.config = config
44  self.services = services
45 
46  def close(self):
47  '''Stop all services'''
48  for service in self.services.values():
49  service.stop()
50 
51 
52 class Looper(object):
53  """Creates a set of analyzers, and schedules the event processing."""
54 
55  def __init__( self, name,
56  config,
57  nEvents=None,
58  firstEvent=0,
59  nPrint=0,
60  timeReport=False,
61  quiet=False,
62  memCheckFromEvent=-1):
63  """Handles the processing of an event sample.
64  An Analyzer is built for each Config.Analyzer present
65  in sequence. The Looper can then be used to process an event,
66  or a collection of events.
67 
68  Parameters:
69  name : name of the Looper, will be used as the output directory name
70  config : process configuration information, see Config
71  nEvents : number of events to process. Defaults to all.
72  firstEvent : first event to process. Defaults to the first one.
73  nPrint : number of events to print at the beginning
74  """
75 
76  self.config = config
77  self.name = self._prepareOutput(name)
78  self.outDir = self.name
79  self.logger = logging.getLogger( self.name )
80  self.logger.addHandler(logging.FileHandler('/'.join([self.name,
81  'log.txt'])))
82  self.logger.propagate = False
83  if not quiet:
84  self.logger.addHandler( logging.StreamHandler(sys.stdout) )
85 
86  self.cfg_comp = config.components[0]
87  self.classes = {}
88  self.analyzers = map( self._build, config.sequence )
89  self.nEvents = nEvents
90  self.firstEvent = firstEvent
91  self.nPrint = int(nPrint)
92  self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
93  self.memReportFirstEvent = memCheckFromEvent
94  self.memLast=0
95  tree_name = None
96  if( hasattr(self.cfg_comp, 'tree_name') ):
97  tree_name = self.cfg_comp.tree_name
98  if len(self.cfg_comp.files)==0:
99  errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
100  raise ValueError( errmsg )
101  if hasattr(config,"preprocessor") and config.preprocessor is not None :
102  self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
103  if hasattr(self.cfg_comp,"options"):
104  print(self.cfg_comp.files,self.cfg_comp.options)
105  self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
106  else :
107  self.events = config.events_class(self.cfg_comp.files, tree_name)
108  if hasattr(self.cfg_comp, 'fineSplit'):
109  fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
110  if fineSplitFactor > 1:
111  if len(self.cfg_comp.files) != 1:
112  raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
113  totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
114  self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
115  self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
116  if self.firstEvent + self.nEvents >= totevents:
117  self.nEvents = totevents - self.firstEvent
118  #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
119  # self.event is set in self.process
120  self.event = None
121  services = dict()
122  for cfg_serv in config.services:
123  service = self._build(cfg_serv)
124  services[cfg_serv.name] = service
125  # would like to provide a copy of the config to the setup,
126  # so that analyzers cannot modify the config of other analyzers.
127  # but cannot copy the autofill config.
128  self.setup = Setup(config, services)
129 
130  def _build(self, cfg):
131  theClass = cfg.class_object
132  obj = theClass( cfg, self.cfg_comp, self.outDir )
133  return obj
134 
135  def _prepareOutput(self, name):
136  index = 0
137  tmpname = name
138  while True and index < 2000:
139  try:
140  # print 'mkdir', self.name
141  os.mkdir( tmpname )
142  break
143  except OSError:
144  index += 1
145  tmpname = '%s_%d' % (name, index)
146  if index == 2000:
147  raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
148  return tmpname
149 
150 
151  def loop(self):
152  """Loop on a given number of events.
153 
154  At the beginning of the loop,
155  Analyzer.beginLoop is called for each Analyzer.
156  At each event, self.process is called.
157  At the end of the loop, Analyzer.endLoop is called.
158  """
159  nEvents = self.nEvents
160  firstEvent = self.firstEvent
161  iEv = firstEvent
162  if nEvents is None or int(nEvents) > len(self.events) :
163  nEvents = len(self.events)
164  else:
165  nEvents = int(nEvents)
166  eventSize = nEvents
167  self.logger.info(
168  'starting loop at event {firstEvent} '\
169  'to process {eventSize} events.'.format(firstEvent=firstEvent,
170  eventSize=eventSize))
171  self.logger.info( str( self.cfg_comp ) )
172  for analyzer in self.analyzers:
173  analyzer.beginLoop(self.setup)
174  try:
175  for iEv in range(firstEvent, firstEvent+eventSize):
176  # if iEv == nEvents:
177  # break
178  if iEv%100 ==0:
179  # print 'event', iEv
180  if not hasattr(self,'start_time'):
181  print('event', iEv)
182  self.start_time = timeit.default_timer()
183  self.start_time_event = iEv
184  else:
185  print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
186 
187  self.process( iEv )
188  if iEv<self.nPrint:
189  print(self.event)
190 
191  except UserWarning:
192  print('Stopped loop following a UserWarning exception')
193 
194  info = self.logger.info
195  warning = self.logger.warning
196  warning('number of events processed: {nEv}'.format(nEv=iEv+1))
197  warning('')
198  info( self.cfg_comp )
199  info('')
200  for analyzer in self.analyzers:
201  analyzer.endLoop(self.setup)
202  if self.timeReport:
203  allev = max([x['events'] for x in self.timeReport])
204  warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
205  warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
206  warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
207  sumtime = sum(rep['time'] for rep in self.timeReport)
208  passev = self.timeReport[-1]['events']
209  for ana,rep in zip(self.analyzers,self.timeReport):
210  timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
211  timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
212  fracAllEv = rep['time']/sumtime
213  warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
214  totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
215  totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
216  warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
217  warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
218  warning("")
219  if hasattr(self.events, 'endLoop'): self.events.endLoop()
220  if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
221  if hasattr(self.config.preprocessor,"endLoop"):
222  self.config.preprocessor.endLoop(self.cfg_comp)
223 
224  def process(self, iEv ):
225  """Run event processing for all analyzers in the sequence.
226 
227  This function is called by self.loop,
228  but can also be called directly from
229  the python interpreter, to jump to a given event.
230  """
231  self.event = Event(iEv, self.events[iEv], self.setup)
232  self.iEvent = iEv
233  for i,analyzer in enumerate(self.analyzers):
234  if not analyzer.beginLoopCalled:
235  analyzer.beginLoop(self.setup)
236  start = timeit.default_timer()
237  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
238  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
239  if memNow > self.memLast :
240  print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
241  self.memLast=memNow
242  ret = analyzer.process( self.event )
243  if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
244  memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
245  if memNow > self.memLast :
246  print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
247  self.memLast=memNow
248  if self.timeReport:
249  self.timeReport[i]['events'] += 1
250  if self.timeReport[i]['events'] > 0:
251  self.timeReport[i]['time'] += timeit.default_timer() - start
252  if ret == False:
253  return (False, analyzer.name)
254  if iEv<self.nPrint:
255  self.logger.info( self.event.__str__() )
256  return (True, analyzer.name)
257 
258  def write(self):
259  """Writes all analyzers.
260 
261  See Analyzer.Write for more information.
262  """
263  for analyzer in self.analyzers:
264  analyzer.write(self.setup)
265  self.setup.close()
266 
267 
268 if __name__ == '__main__':
269 
270  import pickle
271  import sys
272  import os
273  from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
274  from optparse import OptionParser
275  parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
276  parser.add_option('--options',dest='options',default='',help='options json file')
277  (options,args) = parser.parse_args()
278 
279  if options.options!='':
280  jsonfilename = options.options
281  jfile = open (jsonfilename, 'r')
282  opts=json.loads(jfile.readline())
283  for k,v in opts.items():
284  _heppyGlobalOptions[k]=v
285  jfile.close()
286 
287  if len(args) == 1 :
288  cfgFileName = args[0]
289  pckfile = open( cfgFileName, 'r' )
290  config = pickle.load( pckfile )
291  comp = config.components[0]
292  events_class = config.events_class
293  elif len(args) == 2 :
294  cfgFileName = args[0]
295  file = open( cfgFileName, 'r' )
296  cfg = imp.load_source( 'cfg', cfgFileName, file)
297  compFileName = args[1]
298  pckfile = open( compFileName, 'r' )
299  comp = pickle.load( pckfile )
300  cfg.config.components=[comp]
301  events_class = cfg.config.events_class
302 
303  looper = Looper( 'Loop', cfg.config,nPrint = 5)
304  looper.loop()
305  looper.write()
306 
def loop(self)
Definition: looper.py:151
constexpr int32_t ceil(float num)
static const TGPicture * info(bool iBackgroundIsBlack)
memReportFirstEvent
Definition: looper.py:93
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
Definition: looper.py:62
def close(self)
Definition: looper.py:46
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def write(self)
Definition: looper.py:258
def _build(self, cfg)
Definition: looper.py:130
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def process(self, iEv)
Definition: looper.py:224
def _prepareOutput(self, name)
Definition: looper.py:135
def __init__(self, config, services)
Definition: looper.py:28
#define str(s)