1 from __future__
import print_function
2 from __future__
import absolute_import
6 from builtins
import range
13 from .event
import Event
19 '''The Looper creates a Setup object to hold information relevant during 20 the whole process, such as the process configuration obtained from 21 the configuration file, or services that can be used by several analyzers. 23 The user may freely attach new information to the setup object, 24 as long as this information is relevant during the whole process. 25 If the information is event specific, it should be attached to the event 30 Create a Setup object. 34 config: configuration object from the configuration file 36 services: dictionary of services indexed by service name. 37 The service name has the form classObject_instanceLabel 39 <base_heppy_path>.framework.services.tfile.TFileService_myhists 40 To find out about the service name of a given service, 41 load your configuration file in python, and print the service. 47 '''Stop all services''' 53 """Creates a set of analyzers, and schedules the event processing.""" 62 memCheckFromEvent=-1):
63 """Handles the processing of an event sample. 64 An Analyzer is built for each Config.Analyzer present 65 in sequence. The Looper can then be used to process an event, 66 or a collection of events. 69 name : name of the Looper, will be used as the output directory name 70 config : process configuration information, see Config 71 nEvents : number of events to process. Defaults to all. 72 firstEvent : first event to process. Defaults to the first one. 73 nPrint : number of events to print at the beginning 82 self.
logger.propagate =
False 84 self.
logger.addHandler( logging.StreamHandler(sys.stdout) )
99 errmsg =
'please provide at least an input file in the files attribute of this component\n' +
str(self.
cfg_comp)
100 raise ValueError( errmsg )
101 if hasattr(config,
"preprocessor")
and config.preprocessor
is not None :
103 if hasattr(self.
cfg_comp,
"options"):
108 if hasattr(self.
cfg_comp,
'fineSplit'):
109 fineSplitIndex, fineSplitFactor = self.
cfg_comp.fineSplit
110 if fineSplitFactor > 1:
112 raise RuntimeError(
"Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.
cfg_comp.name, self.
cfg_comp.files))
113 totevents =
min(len(self.
events),
int(nEvents))
if (nEvents
and int(nEvents)
not in [-1,0])
else len(self.
events)
122 for cfg_serv
in config.services:
123 service = self.
_build(cfg_serv)
124 services[cfg_serv.name] = service
131 theClass = cfg.class_object
138 while True and index < 2000:
145 tmpname =
'%s_%d' % (name, index)
147 raise ValueError(
"More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
152 """Loop on a given number of events. 154 At the beginning of the loop, 155 Analyzer.beginLoop is called for each Analyzer. 156 At each event, self.process is called. 157 At the end of the loop, Analyzer.endLoop is called. 162 if nEvents
is None or int(nEvents) > len(self.
events) :
163 nEvents = len(self.
events)
165 nEvents =
int(nEvents)
168 'starting loop at event {firstEvent} '\
169 'to process {eventSize} events.'.
format(firstEvent=firstEvent,
170 eventSize=eventSize))
173 analyzer.beginLoop(self.
setup)
175 for iEv
in range(firstEvent, firstEvent+eventSize):
180 if not hasattr(self,
'start_time'):
192 print(
'Stopped loop following a UserWarning exception')
195 warning = self.
logger.warning
196 warning(
'number of events processed: {nEv}'.
format(nEv=iEv+1))
201 analyzer.endLoop(self.
setup)
204 warning(
"\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
205 warning(
"%9s %9s %9s %9s %6s %s" % (
"processed",
"all evts",
"time/proc",
" time/all",
" [%] ",
"analyer"))
206 warning(
"%9s %9s %9s %9s %6s %s" % (
"---------",
"--------",
"---------",
"---------",
" -----",
"-------------"))
207 sumtime = sum(rep[
'time']
for rep
in self.
timeReport)
210 timePerProcEv = rep[
'time']/(rep[
'events']-1)
if rep[
'events'] > 1
else 0
211 timePerAllEv = rep[
'time']/(allev-1)
if allev > 1
else 0
212 fracAllEv = rep[
'time']/sumtime
213 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep[
'events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
214 totPerProcEv = sumtime/(passev-1)
if passev > 1
else 0
215 totPerAllEv = sumtime/(allev-1)
if allev > 1
else 0
216 warning(
"%9s %9s %9s %9s %s" % (
"---------",
"--------",
"---------",
"---------",
"-------------"))
217 warning(
"%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0,
"TOTAL"))
219 if hasattr(self.
events,
'endLoop'): self.
events.endLoop()
220 if hasattr(self.
config,
"preprocessor")
and self.
config.preprocessor
is not None:
221 if hasattr(self.
config.preprocessor,
"endLoop"):
225 """Run event processing for all analyzers in the sequence. 227 This function is called by self.loop, 228 but can also be called directly from 229 the python interpreter, to jump to a given event. 233 for i,analyzer
in enumerate(self.
analyzers):
234 if not analyzer.beginLoopCalled:
235 analyzer.beginLoop(self.
setup)
236 start = timeit.default_timer()
238 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
240 print(
"Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
242 ret = analyzer.process( self.
event )
244 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
246 print(
"Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.
memLast, memNow, memNow-self.
memLast))
251 self.
timeReport[i][
'time'] += timeit.default_timer() - start
253 return (
False, analyzer.name)
256 return (
True, analyzer.name)
259 """Writes all analyzers. 261 See Analyzer.Write for more information. 264 analyzer.write(self.
setup)
268 if __name__ ==
'__main__':
273 from PhysicsTools.HeppyCore.framework.heppy_loop
import _heppyGlobalOptions
274 from optparse
import OptionParser
275 parser = OptionParser(usage=
'%prog cfgFileName compFileName [--options=optFile.json]')
276 parser.add_option(
'--options',dest=
'options',default=
'',help=
'options json file')
277 (options,args) = parser.parse_args()
279 if options.options!=
'':
280 jsonfilename = options.options
281 jfile = open (jsonfilename,
'r') 282 opts=json.loads(jfile.readline()) 283 for k,v
in opts.items():
284 _heppyGlobalOptions[k]=v
288 cfgFileName = args[0]
289 pckfile = open( cfgFileName,
'r' ) 290 config = pickle.load( pckfile ) 291 comp = config.components[0] 292 events_class = config.events_class 293 elif len(args) == 2 :
294 cfgFileName = args[0]
295 file = open( cfgFileName,
'r' ) 296 cfg = imp.load_source( 'cfg', cfgFileName, file)
297 compFileName = args[1]
298 pckfile = open( compFileName,
'r' ) 299 comp = pickle.load( pckfile ) 300 cfg.config.components=[comp] 301 events_class = cfg.config.events_class 303 looper = Looper( 'Loop', cfg.config,nPrint = 5)
constexpr int32_t ceil(float num)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
def __init__(self, name, config, nEvents=None, firstEvent=0, nPrint=0, timeReport=False, quiet=False, memCheckFromEvent=-1)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
static std::string join(char **cmd)
def _prepareOutput(self, name)
def __init__(self, config, services)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)