10 from event
import Event
16 '''The Looper creates a Setup object to hold information relevant during
17 the whole process, such as the process configuration obtained from
18 the configuration file, or services that can be used by several analyzers.
20 The user may freely attach new information to the setup object,
21 as long as this information is relevant during the whole process.
22 If the information is event specific, it should be attached to the event
27 Create a Setup object.
31 config: configuration object from the configuration file
33 services: dictionary of services indexed by service name.
34 The service name has the form classObject_instanceLabel
36 <base_heppy_path>.framework.services.tfile.TFileService_myhists
37 To find out about the service name of a given service,
38 load your configuration file in python, and print the service.
44 '''Stop all services'''
45 for service
in self.services.values():
50 """Creates a set of analyzers, and schedules the event processing."""
59 memCheckFromEvent=-1):
60 """Handles the processing of an event sample.
61 An Analyzer is built for each Config.Analyzer present
62 in sequence. The Looper can then be used to process an event,
63 or a collection of events.
66 name : name of the Looper, will be used as the output directory name
67 config : process configuration information, see Config
68 nEvents : number of events to process. Defaults to all.
69 firstEvent : first event to process. Defaults to the first one.
70 nPrint : number of events to print at the beginning
77 self.logger.addHandler(logging.FileHandler(
'/'.
join([self.
name,
79 self.logger.propagate =
False
81 self.logger.addHandler( logging.StreamHandler(sys.stdout) )
94 tree_name = self.cfg_comp.tree_name
95 if len(self.cfg_comp.files)==0:
96 errmsg =
'please provide at least an input file in the files attribute of this component\n' + str(self.
cfg_comp)
97 raise ValueError( errmsg )
98 if hasattr(config,
"preprocessor")
and config.preprocessor
is not None :
100 if hasattr(self.
cfg_comp,
"options"):
101 print self.cfg_comp.files,self.cfg_comp.options
102 self.
events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
104 self.
events = config.events_class(self.cfg_comp.files, tree_name)
105 if hasattr(self.
cfg_comp,
'fineSplit'):
106 fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
107 if fineSplitFactor > 1:
108 if len(self.cfg_comp.files) != 1:
109 raise RuntimeError(
"Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
110 totevents =
min(len(self.
events),int(nEvents))
if (nEvents
and int(nEvents)
not in [-1,0])
else len(self.
events)
111 self.
nEvents = int(ceil(totevents/float(fineSplitFactor)))
119 for cfg_serv
in config.services:
120 service = self.
_build(cfg_serv)
121 services[cfg_serv.name] = service
128 theClass = cfg.class_object
135 while True and index < 2000:
142 tmpname =
'%s_%d' % (name, index)
144 raise ValueError(
"More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
149 """Loop on a given number of events.
151 At the beginning of the loop,
152 Analyzer.beginLoop is called for each Analyzer.
153 At each event, self.process is called.
154 At the end of the loop, Analyzer.endLoop is called.
159 if nEvents
is None or int(nEvents) > len(self.
events) :
160 nEvents = len(self.
events)
162 nEvents = int(nEvents)
165 'starting loop at event {firstEvent} '\
166 'to process {eventSize} events.'.
format(firstEvent=firstEvent,
167 eventSize=eventSize))
168 self.logger.info( str( self.
cfg_comp ) )
170 analyzer.beginLoop(self.
setup)
172 for iEv
in range(firstEvent, firstEvent+eventSize):
177 if not hasattr(self,
'start_time'):
189 print 'Stopped loop following a UserWarning exception'
191 info = self.logger.info
192 warning = self.logger.warning
193 warning(
'number of events processed: {nEv}'.
format(nEv=iEv+1))
198 analyzer.endLoop(self.
setup)
201 warning(
"\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
202 warning(
"%9s %9s %9s %9s %6s %s" % (
"processed",
"all evts",
"time/proc",
" time/all",
" [%] ",
"analyer"))
203 warning(
"%9s %9s %9s %9s %6s %s" % (
"---------",
"--------",
"---------",
"---------",
" -----",
"-------------"))
204 sumtime = sum(rep[
'time']
for rep
in self.
timeReport)
207 timePerProcEv = rep[
'time']/(rep[
'events']-1)
if rep[
'events'] > 1
else 0
208 timePerAllEv = rep[
'time']/(allev-1)
if allev > 1
else 0
209 fracAllEv = rep[
'time']/sumtime
210 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep[
'events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
211 totPerProcEv = sumtime/(passev-1)
if passev > 1
else 0
212 totPerAllEv = sumtime/(allev-1)
if allev > 1
else 0
213 warning(
"%9s %9s %9s %9s %s" % (
"---------",
"--------",
"---------",
"---------",
"-------------"))
214 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0,
"TOTAL"))
216 if hasattr(self.
events,
'endLoop'): self.events.endLoop()
217 if hasattr(self.
config,
"preprocessor")
and self.config.preprocessor
is not None:
218 if hasattr(self.config.preprocessor,
"endLoop"):
219 self.config.preprocessor.endLoop(self.
cfg_comp)
222 """Run event processing for all analyzers in the sequence.
224 This function is called by self.loop,
225 but can also be called directly from
226 the python interpreter, to jump to a given event.
230 for i,analyzer
in enumerate(self.
analyzers):
231 if not analyzer.beginLoopCalled:
232 analyzer.beginLoop(self.
setup)
233 start = timeit.default_timer()
235 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
237 print "Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast)
239 ret = analyzer.process( self.
event )
241 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
243 print "Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast)
248 self.
timeReport[i][
'time'] += timeit.default_timer() - start
250 return (
False, analyzer.name)
252 self.logger.info( self.event.__str__() )
253 return (
True, analyzer.name)
256 """Writes all analyzers.
258 See Analyzer.Write for more information.
261 analyzer.write(self.
setup)
265 if __name__ ==
'__main__':
270 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
271 from optparse
import OptionParser
272 parser = OptionParser(usage=
'%prog cfgFileName compFileName [--options=optFile.json]')
273 parser.add_option(
'--options',dest=
'options',default=
'',help=
'options json file')
274 (options,args) = parser.parse_args()
276 if options.options!=
'':
277 jsonfilename = options.options
278 jfile = open (jsonfilename,
'r')
279 opts=json.loads(jfile.readline())
280 for k,v
in opts.iteritems():
281 _heppyGlobalOptions[k]=v
285 cfgFileName = args[0]
286 pckfile = open( cfgFileName,
'r' )
287 config = pickle.load( pckfile )
288 comp = config.components[0]
289 events_class = config.events_class
290 elif len(args) == 2 :
291 cfgFileName = args[0]
292 file = open( cfgFileName,
'r' )
293 cfg = imp.load_source( 'cfg', cfgFileName, file)
294 compFileName = args[1]
295 pckfile = open( compFileName,
'r' )
296 comp = pickle.load( pckfile )
297 cfg.config.components=[comp]
298 events_class = cfg.config.events_class
300 looper = Looper( 'Loop', cfg.config,nPrint = 5)
static std::string join(char **cmd)