1 from __future__
import print_function
11 from event
import Event
18 '''The Looper creates a Setup object to hold information relevant during 19 the whole process, such as the process configuration obtained from 20 the configuration file, or services that can be used by several analyzers. 22 The user may freely attach new information to the setup object, 23 as long as this information is relevant during the whole process. 24 If the information is event specific, it should be attached to the event 29 Create a Setup object. 33 config: configuration object from the configuration file 35 services: dictionary of services indexed by service name. 36 The service name has the form classObject_instanceLabel 38 <base_heppy_path>.framework.services.tfile.TFileService_myhists 39 To find out about the service name of a given service, 40 load your configuration file in python, and print the service. 46 '''Stop all services''' 47 for service
in self.services.values():
52 """Creates a set of analyzers, and schedules the event processing.""" 61 memCheckFromEvent=-1):
62 """Handles the processing of an event sample. 63 An Analyzer is built for each Config.Analyzer present 64 in sequence. The Looper can then be used to process an event, 65 or a collection of events. 68 name : name of the Looper, will be used as the output directory name 69 config : process configuration information, see Config 70 nEvents : number of events to process. Defaults to all. 71 firstEvent : first event to process. Defaults to the first one. 72 nPrint : number of events to print at the beginning 79 self.logger.addHandler(logging.FileHandler(
'/'.
join([self.
name,
81 self.logger.propagate =
False 83 self.logger.addHandler( logging.StreamHandler(sys.stdout) )
95 if( hasattr(self.
cfg_comp,
'tree_name') ):
96 tree_name = self.cfg_comp.tree_name
97 if len(self.cfg_comp.files)==0:
98 errmsg =
'please provide at least an input file in the files attribute of this component\n' +
str(self.
cfg_comp)
99 raise ValueError( errmsg )
100 if hasattr(config,
"preprocessor")
and config.preprocessor
is not None :
102 if hasattr(self.
cfg_comp,
"options"):
103 print(self.cfg_comp.files,self.cfg_comp.options)
104 self.
events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
106 self.
events = config.events_class(self.cfg_comp.files, tree_name)
107 if hasattr(self.
cfg_comp,
'fineSplit'):
108 fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
109 if fineSplitFactor > 1:
110 if len(self.cfg_comp.files) != 1:
111 raise RuntimeError(
"Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
112 totevents =
min(len(self.
events),
int(nEvents))
if (nEvents
and int(nEvents)
not in [-1,0])
else len(self.
events)
121 for cfg_serv
in config.services:
122 service = self.
_build(cfg_serv)
123 services[cfg_serv.name] = service
130 theClass = cfg.class_object
137 while True and index < 2000:
144 tmpname =
'%s_%d' % (name, index)
146 raise ValueError(
"More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
151 """Loop on a given number of events. 153 At the beginning of the loop, 154 Analyzer.beginLoop is called for each Analyzer. 155 At each event, self.process is called. 156 At the end of the loop, Analyzer.endLoop is called. 161 if nEvents
is None or int(nEvents) > len(self.
events) :
162 nEvents = len(self.
events)
164 nEvents =
int(nEvents)
167 'starting loop at event {firstEvent} '\
168 'to process {eventSize} events.'.
format(firstEvent=firstEvent,
169 eventSize=eventSize))
172 analyzer.beginLoop(self.
setup)
174 for iEv
in range(firstEvent, firstEvent+eventSize):
179 if not hasattr(self,
'start_time'):
191 print(
'Stopped loop following a UserWarning exception')
193 info = self.logger.info
194 warning = self.logger.warning
195 warning(
'number of events processed: {nEv}'.
format(nEv=iEv+1))
200 analyzer.endLoop(self.
setup)
203 warning(
"\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
204 warning(
"%9s %9s %9s %9s %6s %s" % (
"processed",
"all evts",
"time/proc",
" time/all",
" [%] ",
"analyer"))
205 warning(
"%9s %9s %9s %9s %6s %s" % (
"---------",
"--------",
"---------",
"---------",
" -----",
"-------------"))
206 sumtime = sum(rep[
'time']
for rep
in self.
timeReport)
209 timePerProcEv = rep[
'time']/(rep[
'events']-1)
if rep[
'events'] > 1
else 0
210 timePerAllEv = rep[
'time']/(allev-1)
if allev > 1
else 0
211 fracAllEv = rep[
'time']/sumtime
212 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep[
'events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
213 totPerProcEv = sumtime/(passev-1)
if passev > 1
else 0
214 totPerAllEv = sumtime/(allev-1)
if allev > 1
else 0
215 warning(
"%9s %9s %9s %9s %s" % (
"---------",
"--------",
"---------",
"---------",
"-------------"))
216 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0,
"TOTAL"))
218 if hasattr(self.
events,
'endLoop'): self.events.endLoop()
219 if hasattr(self.
config,
"preprocessor")
and self.config.preprocessor
is not None:
220 if hasattr(self.config.preprocessor,
"endLoop"):
221 self.config.preprocessor.endLoop(self.
cfg_comp)
224 """Run event processing for all analyzers in the sequence. 226 This function is called by self.loop, 227 but can also be called directly from 228 the python interpreter, to jump to a given event. 232 for i,analyzer
in enumerate(self.
analyzers):
233 if not analyzer.beginLoopCalled:
234 analyzer.beginLoop(self.
setup)
235 start = timeit.default_timer()
237 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
239 print(
"Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
241 ret = analyzer.process( self.
event )
243 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
245 print(
"Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
250 self.
timeReport[i][
'time'] += timeit.default_timer() - start
252 return (
False, analyzer.name)
254 self.logger.info( self.event.__str__() )
255 return (
True, analyzer.name)
258 """Writes all analyzers. 260 See Analyzer.Write for more information. 263 analyzer.write(self.
setup)
267 if __name__ ==
'__main__':
272 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
273 from optparse
import OptionParser
274 parser = OptionParser(usage=
'%prog cfgFileName compFileName [--options=optFile.json]')
275 parser.add_option(
'--options',dest=
'options',default=
'',help=
'options json file')
276 (options,args) = parser.parse_args()
278 if options.options!=
'':
279 jsonfilename = options.options
280 jfile = open (jsonfilename,
'r') 281 opts=json.loads(jfile.readline()) 282 for k,v
in six.iteritems(opts):
283 _heppyGlobalOptions[k]=v
287 cfgFileName = args[0]
288 pckfile = open( cfgFileName,
'r' ) 289 config = pickle.load( pckfile ) 290 comp = config.components[0] 291 events_class = config.events_class 292 elif len(args) == 2 :
293 cfgFileName = args[0]
294 file = open( cfgFileName,
'r' ) 295 cfg = imp.load_source( 'cfg', cfgFileName, file)
296 compFileName = args[1]
297 pckfile = open( compFileName,
'r' ) 298 comp = pickle.load( pckfile ) 299 cfg.config.components=[comp] 300 events_class = cfg.config.events_class 302 looper = Looper( 'Loop', cfg.config,nPrint = 5)
S & print(S &os, JobReport::InputFile const &f)
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
static std::string join(char **cmd)
def _prepareOutput(self, name)
def __init__(self, config, services)