1 from __future__
import print_function
2 from __future__
import absolute_import
6 from builtins
import range
13 from .event
import Event
20 '''The Looper creates a Setup object to hold information relevant during
21 the whole process, such as the process configuration obtained from
22 the configuration file, or services that can be used by several analyzers.
24 The user may freely attach new information to the setup object,
25 as long as this information is relevant during the whole process.
26 If the information is event specific, it should be attached to the event
31 Create a Setup object.
35 config: configuration object from the configuration file
37 services: dictionary of services indexed by service name.
38 The service name has the form classObject_instanceLabel
40 <base_heppy_path>.framework.services.tfile.TFileService_myhists
41 To find out about the service name of a given service,
42 load your configuration file in python, and print the service.
48 '''Stop all services'''
54 """Creates a set of analyzers, and schedules the event processing."""
63 memCheckFromEvent=-1):
64 """Handles the processing of an event sample.
65 An Analyzer is built for each Config.Analyzer present
66 in sequence. The Looper can then be used to process an event,
67 or a collection of events.
70 name : name of the Looper, will be used as the output directory name
71 config : process configuration information, see Config
72 nEvents : number of events to process. Defaults to all.
73 firstEvent : first event to process. Defaults to the first one.
74 nPrint : number of events to print at the beginning
85 self.
logger.addHandler( logging.StreamHandler(sys.stdout) )
93 self.
timeReport = [ {
'time':0.0,
'events':0}
for a
in self.
analyzers ]
if timeReport
else False
97 if( hasattr(self.
cfg_comp,
'tree_name') ):
100 errmsg =
'please provide at least an input file in the files attribute of this component\n' +
str(self.
cfg_comp)
101 raise ValueError( errmsg )
102 if hasattr(config,
"preprocessor")
and config.preprocessor
is not None :
104 if hasattr(self.
cfg_comp,
"options"):
109 if hasattr(self.
cfg_comp,
'fineSplit'):
110 fineSplitIndex, fineSplitFactor = self.
cfg_comp.fineSplit
111 if fineSplitFactor > 1:
113 raise RuntimeError(
"Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.
cfg_comp.name, self.
cfg_comp.files))
123 for cfg_serv
in config.services:
124 service = self.
_build(cfg_serv)
125 services[cfg_serv.name] = service
132 theClass = cfg.class_object
139 while True and index < 2000:
146 tmpname =
'%s_%d' % (name, index)
148 raise ValueError(
"More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
153 """Loop on a given number of events.
155 At the beginning of the loop,
156 Analyzer.beginLoop is called for each Analyzer.
157 At each event, self.process is called.
158 At the end of the loop, Analyzer.endLoop is called.
163 if nEvents
is None or int(nEvents) > len(self.
events) :
164 nEvents = len(self.
events)
166 nEvents =
int(nEvents)
169 'starting loop at event {firstEvent} '\
170 'to process {eventSize} events.'.
format(firstEvent=firstEvent,
171 eventSize=eventSize))
174 analyzer.beginLoop(self.
setup)
176 for iEv
in range(firstEvent, firstEvent+eventSize):
181 if not hasattr(self,
'start_time'):
193 print(
'Stopped loop following a UserWarning exception')
196 warning = self.
logger.warning
197 warning(
'number of events processed: {nEv}'.
format(nEv=iEv+1))
202 analyzer.endLoop(self.
setup)
205 warning(
"\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
206 warning(
"%9s %9s %9s %9s %6s %s" % (
"processed",
"all evts",
"time/proc",
" time/all",
" [%] ",
"analyer"))
207 warning(
"%9s %9s %9s %9s %6s %s" % (
"---------",
"--------",
"---------",
"---------",
" -----",
"-------------"))
208 sumtime = sum(rep[
'time']
for rep
in self.
timeReport)
211 timePerProcEv = rep[
'time']/(rep[
'events']-1)
if rep[
'events'] > 1
else 0
212 timePerAllEv = rep[
'time']/(allev-1)
if allev > 1
else 0
213 fracAllEv = rep[
'time']/sumtime
214 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep[
'events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
215 totPerProcEv = sumtime/(passev-1)
if passev > 1
else 0
216 totPerAllEv = sumtime/(allev-1)
if allev > 1
else 0
217 warning(
"%9s %9s %9s %9s %s" % (
"---------",
"--------",
"---------",
"---------",
"-------------"))
218 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0,
"TOTAL"))
220 if hasattr(self.
events,
'endLoop'): self.
events.endLoop()
221 if hasattr(self.
config,
"preprocessor")
and self.
config.preprocessor
is not None:
222 if hasattr(self.
config.preprocessor,
"endLoop"):
226 """Run event processing for all analyzers in the sequence.
228 This function is called by self.loop,
229 but can also be called directly from
230 the python interpreter, to jump to a given event.
234 for i,analyzer
in enumerate(self.
analyzers):
235 if not analyzer.beginLoopCalled:
236 analyzer.beginLoop(self.
setup)
237 start = timeit.default_timer()
239 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
241 print(
"Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
243 ret = analyzer.process( self.
event )
245 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
247 print(
"Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
252 self.
timeReport[i][
'time'] += timeit.default_timer() - start
254 return (
False, analyzer.name)
257 return (
True, analyzer.name)
260 """Writes all analyzers.
262 See Analyzer.Write for more information.
265 analyzer.write(self.
setup)
269 if __name__ ==
'__main__':
274 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
275 from optparse
import OptionParser
276 parser = OptionParser(usage=
'%prog cfgFileName compFileName [--options=optFile.json]')
277 parser.add_option(
'--options',dest=
'options',default=
'',help=
'options json file')
278 (options,args) = parser.parse_args()
280 if options.options!=
'':
281 jsonfilename = options.options
282 jfile = open (jsonfilename,
'r')
283 opts=json.loads(jfile.readline())
284 for k,v
in six.iteritems(opts):
285 _heppyGlobalOptions[k]=v
289 cfgFileName = args[0]
290 pckfile = open( cfgFileName,
'r' )
291 config = pickle.load( pckfile )
292 comp = config.components[0]
293 events_class = config.events_class
294 elif len(args) == 2 :
295 cfgFileName = args[0]
296 file = open( cfgFileName,
'r' )
297 cfg = imp.load_source(
'cfg', cfgFileName, file)
298 compFileName = args[1]
299 pckfile = open( compFileName,
'r' )
300 comp = pickle.load( pckfile )
301 cfg.config.components=[comp]
302 events_class = cfg.config.events_class
304 looper =
Looper(
'Loop', cfg.config,nPrint = 5)