2 from __future__
import print_function
3 from builtins
import range
4 from itertools
import groupby
5 from operator
import attrgetter,itemgetter
7 from collections
import defaultdict
12 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
13 stream stalls. Use something like this in the configuration:
15 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
17 After running the job, execute this script and pass the name of the
18 StallMonitor log file to the script.
20 By default, the script will then print an 'ASCII art' stall graph
21 which consists of a line of text for each time a module or the
22 source stops or starts. Each line contains the name of the module
23 which either started or stopped running, and the number of modules
24 running on each stream at that moment in time. After that will be
25 the time and stream number. Then if a module just started, you
26 will also see the amount of time the module spent between finishing
27 its prefetching and starting. The state of a module is represented
30 plus ("+") the stream has just finished waiting and is starting a module
31 minus ("-") the stream just finished running a module
33 If a module had to wait more than 0.1 seconds, the end of the line
34 will have "STALLED". Startup actions, e.g. reading conditions,
35 may affect results for the first few events.
37 Using the command line arguments described above you can make the
38 program create a PDF file with actual graphs instead of the 'ASCII art'
41 Once the graph is completed, the program outputs the list of modules
42 which had the greatest total stall times. The list is sorted by
43 total stall time and written in descending order. In addition, the
44 list of all stall times for the module is given.
46 There is an inferior alternative (an old obsolete way).
47 Instead of using the StallMonitor Service, you can use the
48 Tracer Service. Make sure to use the 'printTimestamps' option
49 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
50 There are problems associated with this and it is not recommended.'''
53 kStallThreshold=100000
64 kStartedSourceDelayedRead=7
65 kFinishedSourceDelayedRead=8
68 kSourceFindEvent =
"sourceFindEvent"
69 kSourceDelayedRead =
"sourceDelayedRead"
75 if not l
or l[0] ==
'#':
77 (step,payload) = tuple(l.split(
None,1))
78 payload=payload.split()
81 if step ==
'E' or step ==
'e':
86 stream =
int(payload[0])
87 time =
int(payload[-1])
94 if step ==
'S' or step ==
's':
95 name = kSourceFindEvent
96 trans = kStartedSource
99 trans = kFinishedSource
102 moduleID = payload[1]
107 if step ==
'p' or step ==
'M' or step ==
'm':
113 if step ==
'm' or step ==
'M':
114 isEvent = (
int(payload[2]) == 0)
115 name = moduleNames[moduleID]
119 elif step ==
'A' or step ==
'a':
120 trans = kStartedAcquire
122 trans = kFinishedAcquire
123 name = moduleNames[moduleID]
128 elif step ==
'R' or step ==
'r':
129 trans = kStartedSourceDelayedRead
131 trans = kFinishedSourceDelayedRead
132 name = kSourceDelayedRead
134 if trans
is not None:
135 yield (name,trans,stream,time, isEvent)
142 numStreamsFromSource = 0
146 if l
and l[0] ==
'M':
150 numStreams =
int(i[1])+1
152 if numStreams == 0
and l
and l[0] ==
'S':
153 s =
int(l.split(
' ')[1])
154 if s > numStreamsFromSource:
155 numStreamsFromSource = s
156 if len(l) > 5
and l[0:2] ==
"#M":
157 (id,name)=tuple(l[2:].
split())
158 moduleNames[id] = name
162 numStreams = numStreamsFromSource +1
166 for n
in six.iteritems(moduleNames):
171 """Create a generator which can step through the file and return each processing step.
172 Using a generator reduces the memory overhead when parsing a large file.
180 time = line.split(
" ")[1]
181 time = time.split(
":")
182 time =
int(time[0])*60*60+
int(time[1])*60+
float(time[2])
183 time =
int(1000000*time)
214 streamsThatSawFirstEvent = set()
222 if l.find(
"processing event :") != -1:
223 name = kSourceFindEvent
224 trans = kStartedSource
226 if l.find(
"starting:") != -1:
227 trans = kFinishedSource
228 elif l.find(
"processing event for module") != -1:
230 if l.find(
"finished:") != -1:
231 if l.find(
"prefetching") != -1:
236 if l.find(
"prefetching") != -1:
239 name = l.split(
"'")[1]
240 elif l.find(
"processing event acquire for module:") != -1:
241 trans = kStartedAcquire
242 if l.find(
"finished:") != -1:
243 trans = kFinishedAcquire
244 name = l.split(
"'")[1]
245 elif l.find(
"event delayed read from source") != -1:
246 trans = kStartedSourceDelayedRead
247 if l.find(
"finished:") != -1:
248 trans = kFinishedSourceDelayedRead
249 name = kSourceDelayedRead
250 if trans
is not None:
254 time = time - startTime
255 streamIndex = l.find(
"stream = ")
256 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
257 maxNameSize =
max(maxNameSize, len(name))
259 if trans == kFinishedSource
and not stream
in streamsThatSawFirstEvent:
262 processingSteps.append((name,kStartedSource,stream,time,
True))
263 streamsThatSawFirstEvent.add(stream)
265 processingSteps.append((name,trans,stream,time,
True))
266 numStreams =
max(numStreams, stream+1)
269 return (processingSteps,numStreams,maxNameSize)
275 return self._processingSteps
280 firstLine = inputFile.readline().rstrip()
284 fifthLine = inputFile.readline().rstrip()
286 if (firstLine.find(
"# Transition") != -1)
or (firstLine.find(
"# Step") != -1):
287 print(
"> ... Parsing StallMonitor output.")
288 return StallMonitorParser
290 if firstLine.find(
"++") != -1
or fifthLine.find(
"++") != -1:
293 print(
"> ... Parsing Tracer output.")
297 print(
"Unknown input format.")
303 return parseInput(inputFile)
317 streamTime = [0]*numStreams
318 streamState = [0]*numStreams
320 modulesActiveOnStream = [{}
for x
in range(numStreams)]
321 for n,trans,s,time,isEvent
in processingSteps:
324 modulesOnStream = modulesActiveOnStream[s]
325 if trans == kPrefetchEnd:
326 modulesOnStream[n] = time
327 elif trans == kStarted
or trans == kStartedAcquire:
328 if n
in modulesOnStream:
329 waitTime = time - modulesOnStream[n]
330 modulesOnStream.pop(n,
None)
332 elif trans == kFinished
or trans == kFinishedAcquire:
335 elif trans == kStartedSourceDelayedRead:
336 if streamState[s] == 0:
337 waitTime = time - streamTime[s]
338 elif trans == kStartedSource:
339 modulesOnStream.clear()
340 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
342 if waitTime
is not None:
343 if waitTime > kStallThreshold:
344 t = stalledModules.setdefault(n,[])
346 return stalledModules
351 streamTime = [0]*numStreams
352 streamState = [0]*numStreams
353 moduleTimings = defaultdict(list)
354 modulesActiveOnStream = [defaultdict(int)
for x
in range(numStreams)]
355 for n,trans,s,time,isEvent
in processingSteps:
357 modulesOnStream = modulesActiveOnStream[s]
359 if trans == kStarted:
361 modulesOnStream[n]=time
362 elif trans == kFinished:
363 waitTime = time - modulesOnStream[n]
364 modulesOnStream.pop(n,
None)
368 with open(
'module-timings.json',
'w')
as outfile:
369 outfile.write(json.dumps(moduleTimings, indent=4))
373 streamTime = [0]*numStreams
374 streamState = [0]*numStreams
375 modulesActiveOnStreams = [{}
for x
in range(numStreams)]
376 for n,trans,s,time,isEvent
in processingSteps:
378 modulesActiveOnStream = modulesActiveOnStreams[s]
379 if trans == kPrefetchEnd:
380 modulesActiveOnStream[n] = time
382 elif trans == kStartedAcquire
or trans == kStarted:
383 if n
in modulesActiveOnStream:
384 waitTime = time - modulesActiveOnStream[n]
385 modulesActiveOnStream.pop(n,
None)
387 elif trans == kFinishedAcquire
or trans == kFinished:
390 elif trans == kStartedSourceDelayedRead:
391 if streamState[s] == 0:
392 waitTime = time - streamTime[s]
393 elif trans == kStartedSource:
394 modulesActiveOnStream.clear()
395 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
397 states =
"%-*s: " % (maxNameSize,n)
398 if trans == kStartedAcquire
or trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedSource:
402 for index, state
in enumerate(streamState):
403 if n==kSourceFindEvent
and index == s:
406 states +=
str(state)+
" "
407 states +=
" -- " +
str(time/1000.) +
" " +
str(s) +
" "
408 if waitTime
is not None:
409 states +=
" %.2f"% (waitTime/1000.)
410 if waitTime > kStallThreshold:
419 for name,t
in six.iteritems(stalledModules):
420 maxNameSize =
max(maxNameSize, len(name))
422 priorities.append((name,sum(t),t))
424 priorities.sort(key=
lambda a: a[1], reverse=
True)
426 nameColumn =
"Stalled Module"
427 maxNameSize =
max(maxNameSize, len(nameColumn))
429 stallColumn =
"Tot Stall Time"
430 stallColumnLength = len(stallColumn)
432 print(
"%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times")
433 for n,s,t
in priorities:
434 paddedName =
"%-*s:" % (maxNameSize,n)
435 print(paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t]))
444 return "(x: {}, y: {})".
format(self.
x,self.
y)
454 tmp =
Point(ps[0].x, ps[0].y)
459 reducedPoints.append(tmp)
460 tmp =
Point(p.x, p.y)
461 reducedPoints.append(tmp)
462 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
468 for pairList
in pairLists:
469 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
470 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
471 points.sort(key=attrgetter(
'x'))
483 if len(self.
data) != 0:
484 tmp += self.
data[-1][1]
486 tmp.sort(key=attrgetter(
'x'))
505 oldStreamInfo = streamInfo
506 streamInfo = [[]
for x
in range(numStreams)]
508 for s
in range(numStreams):
510 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
511 for info
in oldStreamInfo[s][1:]:
512 start,length,color = info.unpack()
513 if color == lastColor
and lastStartTime+lastTimeLength == start:
514 lastTimeLength += length
517 lastStartTime = start
518 lastTimeLength = length
535 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
536 for start,length,height
in oldBlocks[1:]:
537 if height == lastHeight
and lastStartTime+lastTimeLength == start:
538 lastTimeLength += length
540 blocks.append((lastStartTime,lastTimeLength,lastHeight))
541 lastStartTime = start
542 lastTimeLength = length
544 blocks.append((lastStartTime,lastTimeLength,lastHeight))
550 points = sorted(points, key=attrgetter(
'x'))
554 for t1,t2
in zip(points, points[1:]):
562 if streamHeight < streamHeightCut:
564 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
565 preparedTimes.sort(key=itemgetter(2))
568 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
569 theTS = [(t[0],t[1])
for t
in ts]
571 theTimes = [(t[0]/1000000.,t[1]/1000000.)
for t
in theTS]
572 yspan = (stream-0.4+height,height*(nthreads-1))
573 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
575 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
578 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
580 stalledModuleNames = set([x
for x
in stalledModuleInfo.iterkeys()])
581 streamLowestRow = [[]
for x
in range(numStreams)]
582 modulesActiveOnStreams = [set()
for x
in range(numStreams)]
583 acquireActiveOnStreams = [set()
for x
in range(numStreams)]
584 externalWorkOnStreams = [set()
for x
in range(numStreams)]
585 previousFinishTime = [
None for x
in range(numStreams)]
586 streamRunningTimes = [[]
for x
in range(numStreams)]
587 streamExternalWorkRunningTimes = [[]
for x
in range(numStreams)]
588 maxNumberOfConcurrentModulesOnAStream = 1
589 externalWorkModulesInJob =
False
590 previousTime = [0
for x
in range(numStreams)]
593 finishBeforeStart = [set()
for x
in range(numStreams)]
594 finishAcquireBeforeStart = [set()
for x
in range(numStreams)]
595 countSource = [0
for x
in range(numStreams)]
596 countDelayedSource = [0
for x
in range(numStreams)]
597 countExternalWork = [defaultdict(int)
for x
in range(numStreams)]
600 for n,trans,s,time,isEvent
in processingSteps:
601 if timeOffset
is None:
606 if time < previousTime[s]:
607 time = previousTime[s]
608 previousTime[s] = time
610 activeModules = modulesActiveOnStreams[s]
611 acquireModules = acquireActiveOnStreams[s]
612 externalWorkModules = externalWorkOnStreams[s]
614 if trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedAcquire
or trans == kStartedSource :
621 if trans == kStarted:
622 countExternalWork[s][n] -= 1
623 if n
in finishBeforeStart[s]:
624 finishBeforeStart[s].
remove(n)
626 elif trans == kStartedAcquire:
627 if n
in finishAcquireBeforeStart[s]:
628 finishAcquireBeforeStart[s].
remove(n)
631 if trans == kStartedSourceDelayedRead:
632 countDelayedSource[s] += 1
633 if countDelayedSource[s] < 1:
635 elif trans == kStartedSource:
637 if countSource[s] < 1:
640 moduleNames = activeModules.copy()
641 moduleNames.update(acquireModules)
642 if trans == kStartedAcquire:
643 acquireModules.add(n)
647 if moduleNames
or externalWorkModules:
648 startTime = previousFinishTime[s]
649 previousFinishTime[s] = time
651 if trans == kStarted
and n
in externalWorkModules:
652 externalWorkModules.remove(n)
653 streamExternalWorkRunningTimes[s].
append(
Point(time, -1))
655 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
656 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
657 elif trans == kFinished
or trans == kFinishedSourceDelayedRead
or trans == kFinishedAcquire
or trans == kFinishedSource :
659 if trans == kFinished:
660 if n
not in activeModules:
661 finishBeforeStart[s].
add(n)
664 if trans == kFinishedSourceDelayedRead:
665 countDelayedSource[s] -= 1
666 if countDelayedSource[s] < 0:
668 elif trans == kFinishedSource:
670 if countSource[s] < 0:
673 if trans == kFinishedAcquire:
675 countExternalWork[s][n] += 1
676 if displayExternalWork:
677 externalWorkModulesInJob =
True
678 if (
not checkOrder)
or countExternalWork[s][n] > 0:
679 externalWorkModules.add(n)
680 streamExternalWorkRunningTimes[s].
append(
Point(time,+1))
681 if checkOrder
and n
not in acquireModules:
682 finishAcquireBeforeStart[s].
add(n)
685 startTime = previousFinishTime[s]
686 previousFinishTime[s] = time
687 moduleNames = activeModules.copy()
688 moduleNames.update(acquireModules)
690 if trans == kFinishedAcquire:
691 acquireModules.remove(n)
692 elif trans == kFinishedSourceDelayedRead:
693 if countDelayedSource[s] == 0:
694 activeModules.remove(n)
695 elif trans == kFinishedSource:
696 if countSource[s] == 0:
697 activeModules.remove(n)
699 activeModules.remove(n)
701 if startTime
is not None:
707 elif (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
710 for n
in moduleNames:
711 if n
in stalledModuleNames:
720 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
723 [xH,yH] = fig.get_size_inches()
724 fig.set_size_inches(xH,yH*4/3)
725 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
726 axStack = plt.subplot2grid((4,1),(3,0))
728 ax.set_xlabel(
"Time (sec)")
729 ax.set_ylabel(
"Stream ID")
730 ax.set_ylim(-0.5,numStreams-0.5)
731 ax.yaxis.set_ticks(
range(numStreams))
733 ax.set_xlim((xLower, xUpper))
735 height = 0.8/maxNumberOfConcurrentModulesOnAStream
736 allStackTimes={
'green': [],
'limegreen':[],
'red': [],
'blue': [],
'orange': [],
'darkviolet': []}
737 for iStream,lowestRow
in enumerate(streamLowestRow):
738 times=[(x.begin/1000000., x.delta/1000000.)
for x
in lowestRow]
739 colors=[x.color
for x
in lowestRow]
741 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
744 for info
in lowestRow:
745 if not info.color ==
'darkviolet':
746 allStackTimes[info.color].
append((info.begin, info.delta))
749 if maxNumberOfConcurrentModulesOnAStream > 1
or externalWorkModulesInJob:
751 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
753 perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
754 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
757 allStackTimes, ax, i, height,
760 addToStackTimes=
False,
765 allStackTimes, ax, i, height,
768 addToStackTimes=
True,
773 allStackTimes, ax, i, height,
776 addToStackTimes=
True,
781 print(
"> ... Generating stack")
783 for color
in [
'green',
'limegreen',
'blue',
'red',
'orange',
'darkviolet']:
784 tmp = allStackTimes[color]
786 stack.update(color, tmp)
788 for stk
in reversed(stack.data):
794 for p1,p2
in zip(stk[1], stk[1][1:]):
796 xs.append((p1.x, p2.x-p1.x, height))
797 xs.sort(key = itemgetter(2))
800 for height, xpairs
in groupby(xs, itemgetter(2)):
801 finalxs = [(e[0]/1000000.,e[1]/1000000.)
for e
in xpairs]
803 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
805 axStack.set_xlabel(
"Time (sec)");
806 axStack.set_ylabel(
"# modules");
807 axStack.set_xlim(ax.get_xlim())
808 axStack.tick_params(top=
'off')
810 fig.text(0.1, 0.95,
"modules running event", color =
"green", horizontalalignment =
'left')
811 fig.text(0.1, 0.92,
"modules running other", color =
"limegreen", horizontalalignment =
'left')
812 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
813 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
814 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
815 if displayExternalWork:
816 fig.text(0.9, 0.92,
"external work", color =
"darkviolet", horizontalalignment =
'right')
817 print(
"> ... Saving to file: '{}'".
format(pdfFile))
821 if __name__==
"__main__":
827 parser = argparse.ArgumentParser(description=
'Convert a text file created by cmsRun into a stream stall graph.',
828 formatter_class=argparse.RawDescriptionHelpFormatter,
830 parser.add_argument(
'filename',
831 type=argparse.FileType(
'r'),
832 help=
'file to process')
833 parser.add_argument(
'-g',
'--graph',
835 metavar=
"'stall.pdf'",
838 help=
'''Create pdf file of stream stall graph. If -g is specified
839 by itself, the default file name is \'stall.pdf\'. Otherwise, the
840 argument to the -g option is the filename.''')
841 parser.add_argument(
'-s',
'--stack',
843 help=
'''Create stack plot, combining all stream-specific info.
844 Can be used only when -g is specified.''')
845 parser.add_argument(
'-e',
'--external',
846 action=
'store_false',
847 help=
'''Suppress display of external work in graphs.''')
848 parser.add_argument(
'-o',
'--order',
850 help=
'''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
851 parser.add_argument(
'-t',
'--timings',
853 help=
'''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
854 parser.add_argument(
'-l',
'--lowerxaxis',
857 help=
'''Lower limit of x axis, default 0, not used if upper limit not set''')
858 parser.add_argument(
'-u',
'--upperxaxis',
860 help=
'''Upper limit of x axis, if not set then x axis limits are set automatically''')
861 args = parser.parse_args()
864 inputFile = args.filename
866 shownStacks = args.stack
867 displayExternalWork = args.external
868 checkOrder = args.order
869 doModuleTimings =
False
871 doModuleTimings =
True
875 if args.upperxaxis
is not None:
877 xUpper = args.upperxaxis
878 xLower = args.lowerxaxis
881 if pdfFile
is not None:
885 matplotlib.use(
"PDF")
886 import matplotlib.pyplot
as plt
887 if not re.match(
r'^[\w\.]+$', pdfFile):
888 print(
"Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile))
889 print(
"Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
893 extension = pdfFile.split(
'.')[-1]
894 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
895 if not extension
in supported_filetypes:
896 print(
"A graph cannot be saved to a filename with extension '{}'.".
format(extension))
897 print(
"The allowed extensions are:")
898 for filetype
in supported_filetypes:
902 if pdfFile
is None and shownStacks:
903 print(
"The -s (--stack) option can be used only when the -g (--graph) option is specified.")
906 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
910 sys.stderr.write(
">processing data\n")
915 sys.stderr.write(
">preparing ASCII art\n")
916 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
918 sys.stderr.write(
">creating PDF\n")
919 createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
922 sys.stderr.write(
">creating module-timings.json\n")