7 s =
"""Purpose: Convert a cmsRun log with Tracer info into a stream stall graph.
9 edmStreamStallGrapher [-g[=arg]] <log file name>
11 Options: -g[=arg] instead of ascii art, create a pdf file of name
12 'arg' showing the work being done on each stream. If '=arg'
13 is not specified, the pdf file name is 'stall.pdf'. There
14 can be no spaces before and after the '=' sign.
16 To Use: Add the Tracer Service to the cmsRun job you want to check for
17 stream stalls. Make sure to use the 'printTimstamps' option
18 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
19 After running the job, execute this script and pass the name of the
20 log file to the script as the only command line argument.
22 To Read: The script will then print an 'ASCII art' stall graph which
23 consists of the name of the module which either started or stopped
24 running on a stream, and the number of modules running on each
25 stream at that the moment in time. If the module just started, you
26 will also see the amount of time the module spent between finishing
27 its prefetching and starting. The state of a module is represented
30 plus ("+") the stream has just finished waiting and is starting a module
31 minus ("-") the stream just finished running a module
33 If a module had to wait more than 0.1 seconds, the end of the line
34 will have "STALLED". Once the first 4 events have finished
35 processing, the program prints "FINISH INIT". This is useful if one
36 wants to ignore stalled caused by startup actions, e.g. reading
39 Once the graph is completed, the program outputs the list of modules
40 which had the greatest total stall times. The list is sorted by
41 total stall time and written in descending order. In addition, the
42 list of all stall times for the module is given. """
55 kSourceFindEvent =
"sourceFindEvent"
56 kSourceDelayedRead =
"sourceDelayedRead"
57 kFinishInit =
"FINISH INIT"
64 foundEventToStartFrom =
False
68 if not l
or l[0] ==
'#':
69 if len(l) > 5
and l[0:2] ==
"#M":
70 (id,name)=tuple(l[2:].
split())
71 moduleNames[id] = name
73 (step,payload) = tuple(l.split(
None,1))
74 payload=payload.split()
78 stream = int(payload[0])
79 time = int(payload[-1])
81 if stream > numStreams:
84 if not foundEventToStartFrom:
86 if step ==
'E' and payload[-2] ==
'5':
87 foundEventToStartFrom =
True
88 processingSteps.append((kFinishInit,kFinished,stream,time))
93 if step ==
'E' or step ==
'e':
94 name = kSourceFindEvent
101 moduleID = payload[1]
106 if step ==
'p' or step ==
'M' or step ==
'm':
112 name = moduleNames[moduleID]
117 if step ==
'R' or step == 'r':
121 name = kSourceDelayedRead
123 if len(name) > maxNameSize:
124 maxNameSize = len(name)
126 processingSteps.append((name,trans,stream,time))
129 return (processingSteps,numStreams,maxNameSize)
133 time = line.split(
" ")[1]
134 time = time.split(
":")
135 time = int(time[0])*60*60+int(time[1])*60+float(time[2])
145 foundEventToStartFrom =
False
147 if not foundEventToStartFrom:
148 if l.find(
"event = 5") != -1:
149 foundEventToStartFrom =
True
150 stream = int( l[l.find(
"stream = ")+9])
151 processingSteps.append((kFinishInit,kFinished,stream,
getTime(l)-startTime))
152 if l.find(
"processing event :") != -1:
156 time = time - startTime
157 streamIndex = l.find(
"stream = ")
158 stream = int( l[streamIndex+9:l.find(
" ",streamIndex+10)])
159 name = kSourceFindEvent
162 if l.find(
"starting:") != -1:
164 processingSteps.append((name,trans,stream,time))
165 if stream > numStreams:
167 if l.find(
"processing event for module") != -1:
171 time = time - startTime
175 if l.find(
"finished:") != -1:
176 if l.find(
"prefetching") != -1:
181 if l.find(
"prefetching") != -1:
184 streamIndex = l.find(
"stream = ")
185 stream = int( l[streamIndex+9:l.find(
" ",streamIndex+10)])
186 name = l.split(
"'")[1]
187 if len(name) > maxNameSize:
188 maxNameSize = len(name)
189 processingSteps.append((name,trans,stream,time))
190 if stream > numStreams:
192 if l.find(
"event delayed read from source") != -1:
196 time = time - startTime
200 if l.find(
"finished:") != -1:
202 streamIndex = l.find(
"stream = ")
203 stream = int( l[streamIndex+9:l.find(
" ",streamIndex+10)])
204 name = kSourceDelayedRead
205 if len(name) > maxNameSize:
206 maxNameSize = len(name)
207 processingSteps.append((name,trans,stream,time))
208 if stream > numStreams:
211 return (processingSteps,numStreams,maxNameSize)
216 firstLine = inputFile.readline().rstrip()
219 if firstLine.find(
"# Step") != -1:
220 print "> ... Parsing StallMonitor output."
221 return parseStallMonitorOutput
222 elif firstLine.find(
"++") != -1:
225 print "> ... Parsing Tracer output."
226 return parseTracerOutput
229 print "Unknown input format."
234 f = open(fileName,
"r")
249 streamTime = [0]*(numStreams+1)
251 modulesActiveOnStream = [{}
for x
in xrange(0,numStreams+1)]
252 for n,trans,s,time
in processingSteps:
254 modulesOnStream = modulesActiveOnStream[s]
255 if trans == kPrefetchEnd:
256 modulesOnStream[n] = time
257 if trans == kStarted:
258 if n
in modulesOnStream:
259 waitTime = time - modulesOnStream[n]
260 if n == kSourceDelayedRead:
261 if 0 == len(modulesOnStream):
262 waitTime = time - streamTime[s]
263 if trans == kFinished:
264 if n != kSourceDelayedRead
and n!=kSourceFindEvent
and n!=kFinishInit:
265 del modulesOnStream[n]
267 if waitTime
is not None:
268 if waitTime > kStallThreshold:
269 t = stalledModules.setdefault(n,[])
271 return stalledModules
276 streamTime = [0]*(numStreams+1)
277 streamState = [0]*(numStreams+1)
278 modulesActiveOnStreams = [{}
for x
in xrange(0,numStreams+1)]
280 for n,trans,s,time
in processingSteps:
284 modulesActiveOnStream = modulesActiveOnStreams[s]
286 if trans == kPrefetchEnd:
287 modulesActiveOnStream[n] = time
289 if trans == kStarted:
290 if n != kSourceFindEvent:
292 if n
in modulesActiveOnStream:
293 waitTime = time - modulesActiveOnStream[n]
294 if n == kSourceDelayedRead:
295 if streamState[s] == 0:
296 waitTime = time-streamTime[s]
297 if trans == kFinished:
298 if n != kSourceDelayedRead
and n!=kSourceFindEvent:
299 del modulesActiveOnStream[n]
300 if n != kSourceFindEvent:
303 states =
"%-*s: " % (maxNameSize,n)
304 if trans == kStarted:
306 if trans == kFinished:
308 for index, state
in enumerate(streamState):
309 if n==kSourceFindEvent
and index == s:
312 states +=str(state)+
" "
313 if waitTime
is not None:
314 states +=
" %.2f"% (waitTime/1000.)
315 if waitTime > kStallThreshold
and seenInit:
316 states +=
" STALLED "+str(time/1000.)+
" "+str(s)
319 return stalledModules
325 for n,t
in stalledModules.iteritems():
327 if nameLength > maxNameSize:
328 maxNameSize = nameLength
330 priorities.append((n,sum(t),t))
333 return cmp(i[1],j[1])
334 priorities.sort(cmp=sumSort, reverse=
True)
336 nameColumn =
"Stalled Module"
337 if len(nameColumn) > maxNameSize:
338 maxNameSize = len(nameColumn)
340 stallColumn =
"Tot Stall Time"
341 stallColumnLength = len(stallColumn)
343 print "%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times"
344 for n,s,t
in priorities:
345 paddedName =
"%-*s:" % (maxNameSize,n)
346 print paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t])
352 oldStreamTimes = streamTimes
353 oldStreamColors = streamColors
355 streamTimes = [[]
for x
in xrange(numStreams+1)]
356 streamColors = [[]
for x
in xrange(numStreams+1)]
358 for s
in xrange(numStreams+1):
359 lastStartTime,lastTimeLength = oldStreamTimes[s][0]
360 lastColor = oldStreamColors[s][0]
361 for i
in xrange(1, len(oldStreamTimes[s])):
362 start,length = oldStreamTimes[s][i]
363 color = oldStreamColors[s][i]
364 if color == lastColor
and lastStartTime+lastTimeLength == start:
365 lastTimeLength += length
367 streamTimes[s].
append((lastStartTime,lastTimeLength))
368 streamColors[s].
append(lastColor)
369 lastStartTime = start
370 lastTimeLength = length
372 streamTimes[s].
append((lastStartTime,lastTimeLength))
373 streamColors[s].
append(lastColor)
375 return (streamTimes,streamColors)
381 matplotlib.use(
"PDF")
382 import matplotlib.pyplot
as plt
384 stalledModuleNames = set([ x
for x
in stalledModuleInfo.iterkeys()])
386 streamTimes = [[]
for x
in xrange(numStreams+1)]
387 streamColors = [[]
for x
in xrange(numStreams+1)]
388 modulesActiveOnStreams = [{}
for x
in xrange(0,numStreams+1)]
389 streamLastEventEndTimes = [
None]*(numStreams+1)
390 streamMultipleModulesRunningTimes = [[]
for x
in xrange(numStreams+1)]
391 maxNumberOfConcurrentModulesOnAStream = 0
392 streamInvertedMessageFromModule = [set()
for x
in xrange(numStreams+1)]
394 for n,trans,s,time
in processingSteps:
398 if streamLastEventEndTimes[s]
is None:
399 streamLastEventEndTimes[s]=time
402 if trans == kStarted:
403 if n == kSourceFindEvent:
407 startTime = streamLastEventEndTimes[s]
410 activeModules = modulesActiveOnStreams[s]
411 moduleNames = set(activeModules.iterkeys())
412 if n
in streamInvertedMessageFromModule[s]
and kTracerInput:
415 streamInvertedMessageFromModule[s].
remove(n)
417 activeModules[n] = time
418 nModulesRunning = len(activeModules)
419 if nModulesRunning > 1:
420 streamMultipleModulesRunningTimes[s].
append([nModulesRunning, time,
None])
421 if nModulesRunning > maxNumberOfConcurrentModulesOnAStream:
422 maxNumberOfConcurrentModulesOnAStream = nModulesRunning
424 startTime =
min(activeModules.itervalues())
425 for k
in activeModules.iterkeys():
426 activeModules[k]=time
428 if trans == kFinished:
429 if n == kSourceFindEvent:
430 streamLastEventEndTimes[s]=time
432 activeModules = modulesActiveOnStreams[s]
433 if n
not in activeModules
and kTracerInput:
436 streamInvertedMessageFromModule[s].
add(n)
438 startTime = activeModules[n]
439 moduleNames = set(activeModules.iterkeys())
441 nModulesRunning = len(activeModules)
442 if nModulesRunning > 0:
443 streamMultipleModulesRunningTimes[s][-1][2]=time
446 for k
in activeModules.iterkeys():
447 activeModules[k] = time
448 if startTime
is not None:
450 if (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
452 streamTimes[s].
append((startTime,time-startTime))
453 for n
in moduleNames:
454 if n
in stalledModuleNames:
462 fig, ax = plt.subplots()
463 ax.set_xlabel(
"Time (sec)")
464 ax.set_ylabel(
"Stream ID")
466 height = 0.8/maxNumberOfConcurrentModulesOnAStream
468 def scaleToSeconds(times):
469 return [(x[0]/1000.,x[1]/1000.)
for x
in times]
471 for i,times
in enumerate(streamTimes):
472 ax.broken_barh(scaleToSeconds(times),(i-0.4,height),facecolors=streamColors[i],edgecolors=streamColors[i],linewidth=0)
475 if maxNumberOfConcurrentModulesOnAStream > 1:
476 for i,occurrences
in enumerate(streamMultipleModulesRunningTimes):
477 for info
in occurrences:
480 times = (info[1], info[2]-info[1])
481 ax.broken_barh(scaleToSeconds([times]), (i-0.4+height, height*(info[0]-1)), facecolors=
"blue",edgecolors=
"blue",linewidth=0)
483 fig.text(0.1, 0.95,
"modules running", color =
"green", horizontalalignment =
'left')
484 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
485 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
486 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
487 print "> ... Saving to file: '{}'".
format(pdfFile)
491 if __name__==
"__main__":
495 if argc
not in [2,3]:
496 sys.stderr.write(
"\n\033[1mERROR:\033[0m Wrong number of arguments specified ({}). Should be 2 or 3.\n\n".
format(argc))
506 elif arg.find(
"-g=") != -1:
508 pdfFile = arg.split(
'=')[1]
509 if not re.match(
r'^[\w\.]+$', pdfFile):
510 print "Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile)
511 print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed."
514 print "Unknown argument ",sys.argv[1]
517 fileName =sys.argv[-1]
519 sys.stderr.write(
">reading file\n" )
520 processingSteps,numStreams,maxNameSize =
readLogFile(sys.argv[-1])
521 sys.stderr.write(
">processing data\n")
524 sys.stderr.write(
">preparing ASCII art\n")
527 sys.stderr.write(
">creating PDF\n")
528 createPDFImage(pdfFile, processingSteps, numStreams, stalledModules)
boost::dynamic_bitset append(const boost::dynamic_bitset<> &bs1, const boost::dynamic_bitset<> &bs2)
this method takes two bitsets bs1 and bs2 and returns result of bs2 appended to the end of bs1 ...
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
def consolidateContiguousBlocks
def parseStallMonitorOutput
static std::string join(char **cmd)
def printStalledModulesInOrder