2 from itertools
import groupby
3 from operator
import attrgetter,itemgetter
9 To Use: Add the Tracer Service to the cmsRun job you want to check for 10 stream stalls. Make sure to use the 'printTimstamps' option 11 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True)) 12 After running the job, execute this script and pass the name of the 13 log file to the script as the only command line argument. 15 To Read: The script will then print an 'ASCII art' stall graph which 16 consists of the name of the module which either started or stopped 17 running on a stream, and the number of modules running on each 18 stream at that the moment in time. If the module just started, you 19 will also see the amount of time the module spent between finishing 20 its prefetching and starting. The state of a module is represented 23 plus ("+") the stream has just finished waiting and is starting a module 24 minus ("-") the stream just finished running a module 26 If a module had to wait more than 0.1 seconds, the end of the line 27 will have "STALLED". Once the first 4 events have finished 28 processing, the program prints "FINISH INIT". This is useful if one 29 wants to ignore stalled caused by startup actions, e.g. reading 32 Once the graph is completed, the program outputs the list of modules 33 which had the greatest total stall times. The list is sorted by 34 total stall time and written in descending order. In addition, the 35 list of all stall times for the module is given.''' 48 kSourceFindEvent =
"sourceFindEvent" 49 kSourceDelayedRead =
"sourceDelayedRead" 56 foundEventToStartFrom =
False 60 if not l
or l[0] ==
'#':
61 if len(l) > 5
and l[0:2] ==
"#M":
62 (id,name)=tuple(l[2:].
split())
63 moduleNames[id] = name
65 (step,payload) = tuple(l.split(
None,1))
66 payload=payload.split()
70 stream =
int(payload[0])
71 time =
int(payload[-1])
74 numStreams =
max(numStreams, stream+1)
78 if step ==
'S' or step ==
's':
79 name = kSourceFindEvent
91 if step ==
'p' or step ==
'M' or step ==
'm':
97 name = moduleNames[moduleID]
102 if step ==
'R' or step == 'r': 106 name = kSourceDelayedRead 108 maxNameSize = max(maxNameSize, len(name)) 109 processingSteps.append((name,trans,stream,time)) 112 return (processingSteps,numStreams,maxNameSize)
116 time = line.split(
" ")[1]
117 time = time.split(
":")
118 time =
int(time[0])*60*60+
int(time[1])*60+
float(time[2])
119 time =
int(1000*time)
129 if l.find(
"processing event :") != -1:
133 time = time - startTime
134 streamIndex = l.find(
"stream = ")
135 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
136 name = kSourceFindEvent
139 if l.find(
"starting:") != -1:
141 processingSteps.append((name,trans,stream,time))
142 numStreams =
max(numStreams, stream+1)
143 if l.find(
"processing event for module") != -1:
147 time = time - startTime
151 if l.find(
"finished:") != -1:
152 if l.find(
"prefetching") != -1:
157 if l.find(
"prefetching") != -1:
160 streamIndex = l.find(
"stream = ")
161 stream =
int( l[streamIndex+9:l.find(
" ",streamIndex+10)])
162 name = l.split(
"'")[1]
163 maxNameSize =
max(maxNameSize, len(name))
164 processingSteps.append((name,trans,stream,time))
165 numStreams =
max(numStreams, stream+1)
166 if l.find(
"event delayed read from source") != -1:
170 time = time - startTime
174 if l.find(
"finished:") != -1:
176 streamIndex = l.find(
"stream = ")
177 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
178 name = kSourceDelayedRead
179 maxNameSize =
max(maxNameSize, len(name))
180 processingSteps.append((name,trans,stream,time))
181 numStreams =
max(numStreams, stream+1)
183 return (processingSteps,numStreams,maxNameSize)
188 firstLine = inputFile.readline().rstrip()
191 if firstLine.find(
"# Step") != -1:
192 print "> ... Parsing StallMonitor output." 193 return parseStallMonitorOutput
194 elif firstLine.find(
"++") != -1:
197 print "> ... Parsing Tracer output." 198 return parseTracerOutput
201 print "Unknown input format." 220 streamTime = [0]*numStreams
222 modulesActiveOnStream = [{}
for x
in xrange(numStreams)]
223 for n,trans,s,time
in processingSteps:
225 modulesOnStream = modulesActiveOnStream[s]
226 if trans == kPrefetchEnd:
227 modulesOnStream[n] = time
228 if trans == kStarted:
229 if n
in modulesOnStream:
230 waitTime = time - modulesOnStream[n]
231 if n == kSourceDelayedRead:
232 if 0 == len(modulesOnStream):
233 waitTime = time - streamTime[s]
234 if trans == kFinished:
235 if n != kSourceDelayedRead
and n!=kSourceFindEvent:
236 modulesOnStream.pop(n,
None)
238 if waitTime
is not None:
239 if waitTime > kStallThreshold:
240 t = stalledModules.setdefault(n,[])
242 return stalledModules
247 streamTime = [0]*numStreams
248 streamState = [0]*numStreams
249 modulesActiveOnStreams = [{}
for x
in xrange(numStreams)]
250 for n,trans,s,time
in processingSteps:
251 modulesActiveOnStream = modulesActiveOnStreams[s]
253 if trans == kPrefetchEnd:
254 modulesActiveOnStream[n] = time
256 if trans == kStarted:
257 if n != kSourceFindEvent:
259 if n
in modulesActiveOnStream:
260 waitTime = time - modulesActiveOnStream[n]
261 if n == kSourceDelayedRead:
262 if streamState[s] == 0:
263 waitTime = time-streamTime[s]
264 if trans == kFinished:
265 if n != kSourceDelayedRead
and n!=kSourceFindEvent:
266 modulesActiveOnStream.pop(n,
None)
267 if n != kSourceFindEvent:
270 states =
"%-*s: " % (maxNameSize,n)
271 if trans == kStarted:
273 if trans == kFinished:
275 for index, state
in enumerate(streamState):
276 if n==kSourceFindEvent
and index == s:
279 states +=
str(state)+
" " 280 if waitTime
is not None:
281 states +=
" %.2f"% (waitTime/1000.)
282 if waitTime > kStallThreshold:
283 states +=
" STALLED "+
str(time/1000.)+
" "+
str(s)
286 return stalledModules
292 for name,t
in stalledModules.iteritems():
293 maxNameSize =
max(maxNameSize, len(name))
295 priorities.append((name,sum(t),t))
298 return cmp(i[1],j[1])
299 priorities.sort(cmp=sumSort, reverse=
True)
301 nameColumn =
"Stalled Module" 302 maxNameSize =
max(maxNameSize, len(nameColumn))
304 stallColumn =
"Tot Stall Time" 305 stallColumnLength = len(stallColumn)
307 print "%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times" 308 for n,s,t
in priorities:
309 paddedName =
"%-*s:" % (maxNameSize,n)
310 print paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t])
319 return "(x: {}, y: {})".
format(self.
x,self.
y)
334 reducedPoints.append(tmp)
336 reducedPoints.append(tmp)
337 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
343 for pairList
in pairLists:
344 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
345 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
346 points.sort(key=attrgetter(
'x'))
358 if len(self.
data) != 0:
359 tmp += self.
data[-1][1]
361 tmp.sort(key=attrgetter(
'x'))
363 self.data.append((graphType, tmp))
380 oldStreamInfo = streamInfo
381 streamInfo = [[]
for x
in xrange(numStreams)]
383 for s
in xrange(numStreams):
384 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
385 for info
in oldStreamInfo[s][1:]:
386 start,length,color = info.unpack()
387 if color == lastColor
and lastStartTime+lastTimeLength == start:
388 lastTimeLength += length
391 lastStartTime = start
392 lastTimeLength = length
406 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
407 for start,length,height
in oldBlocks[1:]:
408 if height == lastHeight
and lastStartTime+lastTimeLength == start:
409 lastTimeLength += length
411 blocks.append((lastStartTime,lastTimeLength,lastHeight))
412 lastStartTime = start
413 lastTimeLength = length
415 blocks.append((lastStartTime,lastTimeLength,lastHeight))
420 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo):
422 stalledModuleNames = set([x
for x
in stalledModuleInfo.iterkeys()])
423 streamInfo = [[]
for x
in xrange(numStreams)]
424 modulesActiveOnStreams = [{}
for x
in xrange(numStreams)]
425 streamLastEventEndTimes = [
None]*numStreams
426 streamRunningTimes = [[]
for x
in xrange(numStreams)]
427 maxNumberOfConcurrentModulesOnAStream = 1
428 streamInvertedMessageFromModule = [set()
for x
in xrange(numStreams)]
430 for n,trans,s,time
in processingSteps:
432 if streamLastEventEndTimes[s]
is None:
433 streamLastEventEndTimes[s]=time
434 if trans == kStarted:
435 if n == kSourceFindEvent:
439 startTime = streamLastEventEndTimes[s]
442 activeModules = modulesActiveOnStreams[s]
443 moduleNames = set(activeModules.iterkeys())
444 if n
in streamInvertedMessageFromModule[s]
and kTracerInput:
447 streamInvertedMessageFromModule[s].
remove(n)
449 activeModules[n] = time
450 nModulesRunning = len(activeModules)
452 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nModulesRunning)
453 if nModulesRunning > 1:
455 startTime =
min(activeModules.itervalues())
456 for k
in activeModules.iterkeys():
457 activeModules[k]=time
459 if trans == kFinished:
460 if n == kSourceFindEvent:
461 streamLastEventEndTimes[s]=time
463 activeModules = modulesActiveOnStreams[s]
464 if n
not in activeModules
and kTracerInput:
467 streamInvertedMessageFromModule[s].
add(n)
470 startTime = activeModules[n]
471 moduleNames = set(activeModules.iterkeys())
473 nModulesRunning = len(activeModules)
474 if nModulesRunning > 0:
477 for k
in activeModules.iterkeys():
478 activeModules[k] = time
479 if startTime
is not None:
481 if (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
484 for n
in moduleNames:
485 if n
in stalledModuleNames:
495 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
498 [xH,yH] = fig.get_size_inches()
499 fig.set_size_inches(xH,yH*4/3)
500 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
501 axStack = plt.subplot2grid((4,1),(3,0))
503 ax.set_xlabel(
"Time (sec)")
504 ax.set_ylabel(
"Stream ID")
505 ax.set_ylim(-0.5,numStreams-0.5)
506 ax.yaxis.set_ticks(xrange(numStreams))
508 height = 0.8/maxNumberOfConcurrentModulesOnAStream
509 allStackTimes={
'green': [],
'red': [],
'blue': [],
'orange': []}
510 for i,perStreamInfo
in enumerate(streamInfo):
511 times=[(x.begin/1000., x.delta/1000.)
for x
in perStreamInfo]
512 colors=[x.color
for x
in perStreamInfo]
513 ax.broken_barh(times,(i-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
514 for info
in perStreamInfo:
515 allStackTimes[info.color].
append((info.begin, info.delta))
518 if maxNumberOfConcurrentModulesOnAStream > 1:
520 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
521 perStreamTimes = sorted(perStreamRunningTimes, key=attrgetter(
'x'))
525 for t1,t2
in zip(perStreamTimes, perStreamTimes[1:]):
529 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
530 preparedTimes.sort(key=itemgetter(2))
532 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
533 theTS = [(t[0],t[1])
for t
in ts]
534 theTimes = [(t[0]/1000.,t[1]/1000.)
for t
in theTS]
535 yspan = (i-0.4+height,height*(nthreads-1))
536 ax.broken_barh(theTimes, yspan, facecolors=
'blue', edgecolors=
'blue', linewidth=0)
537 allStackTimes[
'blue'].extend(theTS*(nthreads-1))
540 print "> ... Generating stack" 542 for color
in [
'green',
'blue',
'red',
'orange']:
543 tmp = allStackTimes[color]
545 stack.update(color, tmp)
547 for stk
in reversed(stack.data):
553 for p1,p2
in zip(stk[1], stk[1][1:]):
555 xs.append((p1.x, p2.x-p1.x, height))
556 xs.sort(key = itemgetter(2))
559 for height, xpairs
in groupby(xs, itemgetter(2)):
560 finalxs = [(e[0]/1000.,e[1]/1000.)
for e
in xpairs]
561 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
563 axStack.set_xlabel(
"Time (sec)");
564 axStack.set_ylabel(
"# threads");
565 axStack.set_xlim(ax.get_xlim())
566 axStack.tick_params(top=
'off')
568 fig.text(0.1, 0.95,
"modules running", color =
"green", horizontalalignment =
'left')
569 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
570 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
571 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
572 print "> ... Saving to file: '{}'".
format(pdfFile)
576 if __name__==
"__main__":
582 parser =
argparse.ArgumentParser(description=
'Convert a cmsRun log with Tracer info into a stream stall graph.',
585 parser.add_argument(
'filename',
587 help='log file to process')
588 parser.add_argument(
'-g',
'--graph',
590 metavar=
"'stall.pdf'",
593 help=
'''Create pdf file of stream stall graph. If -g is specified 594 by itself, the default file name is \'stall.pdf\'. Otherwise, the 595 argument to the -g option is the filename.''')
596 parser.add_argument(
'-s',
'--stack',
598 help=
'''Create stack plot, combining all stream-specific info. 599 Can be used only when -g is specified.''')
600 args = parser.parse_args()
603 inputFile = args.filename
605 shownStacks = args.stack
608 if pdfFile
is not None:
612 matplotlib.use(
"PDF")
613 import matplotlib.pyplot
as plt
614 if not re.match(
r'^[\w\.]+$', pdfFile):
615 print "Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile)
616 print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed." 620 extension = pdfFile.split(
'.')[-1]
621 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
622 if not extension
in supported_filetypes:
623 print "A graph cannot be saved to a filename with extension '{}'.".
format(extension)
624 print "The allowed extensions are:" 625 for filetype
in supported_filetypes:
626 print " '.{}'".
format(filetype)
629 if pdfFile
is None and shownStacks:
630 print "The -s (--stack) option can be used only when the -g (--graph) option is specified." 633 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
634 processingSteps,numStreams,maxNameSize =
readLogFile(inputFile)
635 sys.stderr.write(
">processing data\n")
638 sys.stderr.write(
">preparing ASCII art\n")
641 sys.stderr.write(
">creating PDF\n")
642 createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModules)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def mergeContiguousBlocks(blocks)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def chooseParser(inputFile)
def parseStallMonitorOutput(f)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
def reduceSortedPoints(ps)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
def remove(d, key, TELL=False)
def readLogFile(inputFile)
def __init__(self, x_, y_)
def adjacentDiff(pairLists)