1 from __future__
import print_function
2 from __future__
import absolute_import
6 from builtins
import range
13 from .event
import Event
19 '''The Looper creates a Setup object to hold information relevant during
20 the whole process, such as the process configuration obtained from
21 the configuration file, or services that can be used by several analyzers.
23 The user may freely attach new information to the setup object,
24 as long as this information is relevant during the whole process.
25 If the information is event specific, it should be attached to the event
30 Create a Setup object.
34 config: configuration object from the configuration file
36 services: dictionary of services indexed by service name.
37 The service name has the form classObject_instanceLabel
39 <base_heppy_path>.framework.services.tfile.TFileService_myhists
40 To find out about the service name of a given service,
41 load your configuration file in python, and print the service.
47 '''Stop all services'''
48 for service
in self.services.values():
53 """Creates a set of analyzers, and schedules the event processing."""
62 memCheckFromEvent=-1):
63 """Handles the processing of an event sample.
64 An Analyzer is built for each Config.Analyzer present
65 in sequence. The Looper can then be used to process an event,
66 or a collection of events.
69 name : name of the Looper, will be used as the output directory name
70 config : process configuration information, see Config
71 nEvents : number of events to process. Defaults to all.
72 firstEvent : first event to process. Defaults to the first one.
73 nPrint : number of events to print at the beginning
80 self.logger.addHandler(logging.FileHandler(
'/'.
join([self.
name,
82 self.logger.propagate =
False
84 self.logger.addHandler( logging.StreamHandler(sys.stdout) )
97 tree_name = self.cfg_comp.tree_name
98 if len(self.cfg_comp.files)==0:
99 errmsg =
'please provide at least an input file in the files attribute of this component\n' +
str(self.
cfg_comp)
100 raise ValueError( errmsg )
101 if hasattr(config,
"preprocessor")
and config.preprocessor
is not None :
103 if hasattr(self.
cfg_comp,
"options"):
104 print(self.cfg_comp.files,self.cfg_comp.options)
105 self.
events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
107 self.
events = config.events_class(self.cfg_comp.files, tree_name)
108 if hasattr(self.
cfg_comp,
'fineSplit'):
109 fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
110 if fineSplitFactor > 1:
111 if len(self.cfg_comp.files) != 1:
112 raise RuntimeError(
"Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
113 totevents =
min(len(self.
events),int(nEvents))
if (nEvents
and int(nEvents)
not in [-1,0])
else len(self.
events)
114 self.
nEvents = int(
ceil(totevents/float(fineSplitFactor)))
122 for cfg_serv
in config.services:
123 service = self.
_build(cfg_serv)
124 services[cfg_serv.name] = service
131 theClass = cfg.class_object
138 while True and index < 2000:
145 tmpname =
'%s_%d' % (name, index)
147 raise ValueError(
"More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
152 """Loop on a given number of events.
154 At the beginning of the loop,
155 Analyzer.beginLoop is called for each Analyzer.
156 At each event, self.process is called.
157 At the end of the loop, Analyzer.endLoop is called.
162 if nEvents
is None or int(nEvents) > len(self.
events) :
163 nEvents = len(self.
events)
165 nEvents = int(nEvents)
168 'starting loop at event {firstEvent} '\
169 'to process {eventSize} events.'.
format(firstEvent=firstEvent,
170 eventSize=eventSize))
173 analyzer.beginLoop(self.
setup)
175 for iEv
in range(firstEvent, firstEvent+eventSize):
180 if not hasattr(self,
'start_time'):
192 print(
'Stopped loop following a UserWarning exception')
194 info = self.logger.info
195 warning = self.logger.warning
196 warning(
'number of events processed: {nEv}'.
format(nEv=iEv+1))
201 analyzer.endLoop(self.
setup)
204 warning(
"\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
205 warning(
"%9s %9s %9s %9s %6s %s" % (
"processed",
"all evts",
"time/proc",
" time/all",
" [%] ",
"analyer"))
206 warning(
"%9s %9s %9s %9s %6s %s" % (
"---------",
"--------",
"---------",
"---------",
" -----",
"-------------"))
207 sumtime = sum(rep[
'time']
for rep
in self.
timeReport)
210 timePerProcEv = rep[
'time']/(rep[
'events']-1)
if rep[
'events'] > 1
else 0
211 timePerAllEv = rep[
'time']/(allev-1)
if allev > 1
else 0
212 fracAllEv = rep[
'time']/sumtime
213 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep[
'events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
214 totPerProcEv = sumtime/(passev-1)
if passev > 1
else 0
215 totPerAllEv = sumtime/(allev-1)
if allev > 1
else 0
216 warning(
"%9s %9s %9s %9s %s" % (
"---------",
"--------",
"---------",
"---------",
"-------------"))
217 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0,
"TOTAL"))
219 if hasattr(self.
events,
'endLoop'): self.events.endLoop()
220 if hasattr(self.
config,
"preprocessor")
and self.config.preprocessor
is not None:
221 if hasattr(self.config.preprocessor,
"endLoop"):
222 self.config.preprocessor.endLoop(self.
cfg_comp)
225 """Run event processing for all analyzers in the sequence.
227 This function is called by self.loop,
228 but can also be called directly from
229 the python interpreter, to jump to a given event.
233 for i,analyzer
in enumerate(self.
analyzers):
234 if not analyzer.beginLoopCalled:
235 analyzer.beginLoop(self.
setup)
236 start = timeit.default_timer()
238 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
240 print(
"Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
242 ret = analyzer.process( self.
event )
244 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
246 print(
"Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
251 self.
timeReport[i][
'time'] += timeit.default_timer() - start
253 return (
False, analyzer.name)
255 self.logger.info( self.event.__str__() )
256 return (
True, analyzer.name)
259 """Writes all analyzers.
261 See Analyzer.Write for more information.
264 analyzer.write(self.
setup)
268 if __name__ ==
'__main__':
273 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
274 from optparse
import OptionParser
275 parser = OptionParser(usage=
'%prog cfgFileName compFileName [--options=optFile.json]')
276 parser.add_option(
'--options',dest=
'options',default=
'',help=
'options json file')
277 (options,args) = parser.parse_args()
279 if options.options!=
'':
280 jsonfilename = options.options
281 jfile = open (jsonfilename,
'r')
282 opts=json.loads(jfile.readline())
283 for k,v
in opts.items():
284 _heppyGlobalOptions[k]=v
288 cfgFileName = args[0]
289 pckfile = open( cfgFileName,
'r' )
290 config = pickle.load( pckfile )
291 comp = config.components[0]
292 events_class = config.events_class
293 elif len(args) == 2 :
294 cfgFileName = args[0]
295 file = open( cfgFileName,
'r' )
296 cfg = imp.load_source( 'cfg', cfgFileName, file)
297 compFileName = args[1]
298 pckfile = open( compFileName,
'r' )
299 comp = pickle.load( pckfile )
300 cfg.config.components=[comp]
301 events_class = cfg.config.events_class
303 looper = Looper( 'Loop', cfg.config,nPrint = 5)
constexpr int32_t ceil(float num)
const uint16_t range(const Frame &aFrame)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
if(conf_.getParameter< bool >("UseStripCablingDB"))
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
static std::string join(char **cmd)