10 from event
import Event
17 '''The Looper creates a Setup object to hold information relevant during 18 the whole process, such as the process configuration obtained from 19 the configuration file, or services that can be used by several analyzers. 21 The user may freely attach new information to the setup object, 22 as long as this information is relevant during the whole process. 23 If the information is event specific, it should be attached to the event 28 Create a Setup object. 32 config: configuration object from the configuration file 34 services: dictionary of services indexed by service name. 35 The service name has the form classObject_instanceLabel 37 <base_heppy_path>.framework.services.tfile.TFileService_myhists 38 To find out about the service name of a given service, 39 load your configuration file in python, and print the service. 45 '''Stop all services''' 46 for service
in self.services.values():
51 """Creates a set of analyzers, and schedules the event processing.""" 60 memCheckFromEvent=-1):
61 """Handles the processing of an event sample. 62 An Analyzer is built for each Config.Analyzer present 63 in sequence. The Looper can then be used to process an event, 64 or a collection of events. 67 name : name of the Looper, will be used as the output directory name 68 config : process configuration information, see Config 69 nEvents : number of events to process. Defaults to all. 70 firstEvent : first event to process. Defaults to the first one. 71 nPrint : number of events to print at the beginning 78 self.logger.addHandler(logging.FileHandler(
'/'.
join([self.
name,
80 self.logger.propagate =
False 82 self.logger.addHandler( logging.StreamHandler(sys.stdout) )
94 if( hasattr(self.
cfg_comp,
'tree_name') ):
95 tree_name = self.cfg_comp.tree_name
96 if len(self.cfg_comp.files)==0:
97 errmsg =
'please provide at least an input file in the files attribute of this component\n' +
str(self.
cfg_comp)
98 raise ValueError( errmsg )
99 if hasattr(config,
"preprocessor")
and config.preprocessor
is not None :
101 if hasattr(self.
cfg_comp,
"options"):
102 print self.cfg_comp.files,self.cfg_comp.options
103 self.
events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
105 self.
events = config.events_class(self.cfg_comp.files, tree_name)
106 if hasattr(self.
cfg_comp,
'fineSplit'):
107 fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
108 if fineSplitFactor > 1:
109 if len(self.cfg_comp.files) != 1:
110 raise RuntimeError(
"Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
111 totevents =
min(len(self.
events),
int(nEvents))
if (nEvents
and int(nEvents)
not in [-1,0])
else len(self.
events)
120 for cfg_serv
in config.services:
121 service = self.
_build(cfg_serv)
122 services[cfg_serv.name] = service
129 theClass = cfg.class_object
136 while True and index < 2000:
143 tmpname =
'%s_%d' % (name, index)
145 raise ValueError(
"More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
150 """Loop on a given number of events. 152 At the beginning of the loop, 153 Analyzer.beginLoop is called for each Analyzer. 154 At each event, self.process is called. 155 At the end of the loop, Analyzer.endLoop is called. 160 if nEvents
is None or int(nEvents) > len(self.
events) :
161 nEvents = len(self.
events)
163 nEvents =
int(nEvents)
166 'starting loop at event {firstEvent} '\
167 'to process {eventSize} events.'.
format(firstEvent=firstEvent,
168 eventSize=eventSize))
171 analyzer.beginLoop(self.
setup)
173 for iEv
in range(firstEvent, firstEvent+eventSize):
178 if not hasattr(self,
'start_time'):
190 print 'Stopped loop following a UserWarning exception' 192 info = self.logger.info
193 warning = self.logger.warning
194 warning(
'number of events processed: {nEv}'.
format(nEv=iEv+1))
199 analyzer.endLoop(self.
setup)
202 warning(
"\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
203 warning(
"%9s %9s %9s %9s %6s %s" % (
"processed",
"all evts",
"time/proc",
" time/all",
" [%] ",
"analyer"))
204 warning(
"%9s %9s %9s %9s %6s %s" % (
"---------",
"--------",
"---------",
"---------",
" -----",
"-------------"))
205 sumtime = sum(rep[
'time']
for rep
in self.
timeReport)
208 timePerProcEv = rep[
'time']/(rep[
'events']-1)
if rep[
'events'] > 1
else 0
209 timePerAllEv = rep[
'time']/(allev-1)
if allev > 1
else 0
210 fracAllEv = rep[
'time']/sumtime
211 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep[
'events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
212 totPerProcEv = sumtime/(passev-1)
if passev > 1
else 0
213 totPerAllEv = sumtime/(allev-1)
if allev > 1
else 0
214 warning(
"%9s %9s %9s %9s %s" % (
"---------",
"--------",
"---------",
"---------",
"-------------"))
215 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0,
"TOTAL"))
217 if hasattr(self.
events,
'endLoop'): self.events.endLoop()
218 if hasattr(self.
config,
"preprocessor")
and self.config.preprocessor
is not None:
219 if hasattr(self.config.preprocessor,
"endLoop"):
220 self.config.preprocessor.endLoop(self.
cfg_comp)
223 """Run event processing for all analyzers in the sequence. 225 This function is called by self.loop, 226 but can also be called directly from 227 the python interpreter, to jump to a given event. 231 for i,analyzer
in enumerate(self.
analyzers):
232 if not analyzer.beginLoopCalled:
233 analyzer.beginLoop(self.
setup)
234 start = timeit.default_timer()
236 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
238 print "Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast)
240 ret = analyzer.process( self.
event )
242 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
244 print "Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast)
249 self.
timeReport[i][
'time'] += timeit.default_timer() - start
251 return (
False, analyzer.name)
253 self.logger.info( self.event.__str__() )
254 return (
True, analyzer.name)
257 """Writes all analyzers. 259 See Analyzer.Write for more information. 262 analyzer.write(self.
setup)
266 if __name__ ==
'__main__':
271 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
272 from optparse
import OptionParser
273 parser = OptionParser(usage=
'%prog cfgFileName compFileName [--options=optFile.json]')
274 parser.add_option(
'--options',dest=
'options',default=
'',help=
'options json file')
275 (options,args) = parser.parse_args()
277 if options.options!=
'':
278 jsonfilename = options.options
279 jfile = open (jsonfilename,
'r') 280 opts=json.loads(jfile.readline()) 281 for k,v
in six.iteritems(opts):
282 _heppyGlobalOptions[k]=v
286 cfgFileName = args[0]
287 pckfile = open( cfgFileName,
'r' ) 288 config = pickle.load( pckfile ) 289 comp = config.components[0] 290 events_class = config.events_class 291 elif len(args) == 2 :
292 cfgFileName = args[0]
293 file = open( cfgFileName,
'r' ) 294 cfg = imp.load_source( 'cfg', cfgFileName, file)
295 compFileName = args[1]
296 pckfile = open( compFileName,
'r' ) 297 comp = pickle.load( pckfile ) 298 cfg.config.components=[comp] 299 events_class = cfg.config.events_class 301 looper = Looper( 'Loop', cfg.config,nPrint = 5)
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
static std::string join(char **cmd)
def _prepareOutput(self, name)
def __init__(self, config, services)