2 from __future__
import print_function
3 from builtins
import range
4 from itertools
import groupby
5 from operator
import attrgetter,itemgetter
7 from collections
import defaultdict
11 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
12 stream stalls. Use something like this in the configuration:
14 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
16 After running the job, execute this script and pass the name of the
17 StallMonitor log file to the script.
19 By default, the script will then print an 'ASCII art' stall graph
20 which consists of a line of text for each time a module or the
21 source stops or starts. Each line contains the name of the module
22 which either started or stopped running, and the number of modules
23 running on each stream at that moment in time. After that will be
24 the time and stream number. Then if a module just started, you
25 will also see the amount of time the module spent between finishing
26 its prefetching and starting. The state of a module is represented
29 plus ("+") the stream has just finished waiting and is starting a module
30 minus ("-") the stream just finished running a module
32 If a module had to wait more than 0.1 seconds, the end of the line
33 will have "STALLED". Startup actions, e.g. reading conditions,
34 may affect results for the first few events.
36 Using the command line arguments described above you can make the
37 program create a PDF file with actual graphs instead of the 'ASCII art'
40 Once the graph is completed, the program outputs the list of modules
41 which had the greatest total stall times. The list is sorted by
42 total stall time and written in descending order. In addition, the
43 list of all stall times for the module is given.
45 There is an inferior alternative (an old obsolete way).
46 Instead of using the StallMonitor Service, you can use the
47 Tracer Service. Make sure to use the 'printTimestamps' option
48 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
49 There are problems associated with this and it is not recommended.'''
52 kStallThreshold=100000
63 kStartedSourceDelayedRead=7
64 kFinishedSourceDelayedRead=8
67 kSourceFindEvent =
"sourceFindEvent"
68 kSourceDelayedRead =
"sourceDelayedRead"
74 if not l
or l[0] ==
'#':
76 (step,payload) = tuple(l.split(
None,1))
77 payload=payload.split()
80 if step ==
'E' or step ==
'e':
85 stream = int(payload[0])
86 time = int(payload[-1])
93 if step ==
'S' or step ==
's':
94 name = kSourceFindEvent
95 trans = kStartedSource
98 trans = kFinishedSource
101 moduleID = payload[1]
106 if step ==
'p' or step ==
'M' or step ==
'm':
112 if step ==
'm' or step ==
'M':
113 isEvent = (int(payload[2]) == 0)
114 name = moduleNames[moduleID]
118 elif step ==
'A' or step ==
'a':
119 trans = kStartedAcquire
121 trans = kFinishedAcquire
122 name = moduleNames[moduleID]
127 elif step ==
'R' or step == 'r':
128 trans = kStartedSourceDelayedRead
130 trans = kFinishedSourceDelayedRead
131 name = kSourceDelayedRead
133 if trans
is not None:
134 yield (name,trans,stream,time, isEvent)
141 numStreamsFromSource = 0
145 if l
and l[0] ==
'M':
149 numStreams = int(i[1])+1
151 if numStreams == 0
and l
and l[0] ==
'S':
152 s = int(l.split(
' ')[1])
153 if s > numStreamsFromSource:
154 numStreamsFromSource = s
155 if len(l) > 5
and l[0:2] ==
"#M":
156 (id,name)=tuple(l[2:].
split())
157 moduleNames[id] = name
161 numStreams = numStreamsFromSource +1
165 for n
in moduleNames.items():
170 """Create a generator which can step through the file and return each processing step.
171 Using a generator reduces the memory overhead when parsing a large file.
179 time = line.split(
" ")[1]
180 time = time.split(
":")
181 time = int(time[0])*60*60+int(time[1])*60+float(time[2])
182 time = int(1000000*time)
213 streamsThatSawFirstEvent = set()
221 if l.find(
"processing event :") != -1:
222 name = kSourceFindEvent
223 trans = kStartedSource
225 if l.find(
"starting:") != -1:
226 trans = kFinishedSource
227 elif l.find(
"processing event for module") != -1:
229 if l.find(
"finished:") != -1:
230 if l.find(
"prefetching") != -1:
235 if l.find(
"prefetching") != -1:
238 name = l.split(
"'")[1]
239 elif l.find(
"processing event acquire for module:") != -1:
240 trans = kStartedAcquire
241 if l.find(
"finished:") != -1:
242 trans = kFinishedAcquire
243 name = l.split(
"'")[1]
244 elif l.find(
"event delayed read from source") != -1:
245 trans = kStartedSourceDelayedRead
246 if l.find(
"finished:") != -1:
247 trans = kFinishedSourceDelayedRead
248 name = kSourceDelayedRead
249 if trans
is not None:
253 time = time - startTime
254 streamIndex = l.find(
"stream = ")
255 stream = int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
256 maxNameSize =
max(maxNameSize, len(name))
258 if trans == kFinishedSource
and not stream
in streamsThatSawFirstEvent:
261 processingSteps.append((name,kStartedSource,stream,time,
True))
262 streamsThatSawFirstEvent.add(stream)
264 processingSteps.append((name,trans,stream,time,
True))
265 numStreams =
max(numStreams, stream+1)
268 return (processingSteps,numStreams,maxNameSize)
274 return self._processingSteps
279 firstLine = inputFile.readline().rstrip()
283 fifthLine = inputFile.readline().rstrip()
285 if (firstLine.find(
"# Transition") != -1)
or (firstLine.find(
"# Step") != -1):
286 print(
"> ... Parsing StallMonitor output.")
287 return StallMonitorParser
289 if firstLine.find(
"++") != -1
or fifthLine.find(
"++") != -1:
292 print(
"> ... Parsing Tracer output.")
296 print(
"Unknown input format.")
302 return parseInput(inputFile)
316 streamTime = [0]*numStreams
317 streamState = [0]*numStreams
319 modulesActiveOnStream = [{}
for x
in range(numStreams)]
320 for n,trans,s,time,isEvent
in processingSteps:
323 modulesOnStream = modulesActiveOnStream[s]
324 if trans == kPrefetchEnd:
325 modulesOnStream[n] = time
326 elif trans == kStarted
or trans == kStartedAcquire:
327 if n
in modulesOnStream:
328 waitTime = time - modulesOnStream[n]
329 modulesOnStream.pop(n,
None)
331 elif trans == kFinished
or trans == kFinishedAcquire:
334 elif trans == kStartedSourceDelayedRead:
335 if streamState[s] == 0:
336 waitTime = time - streamTime[s]
337 elif trans == kStartedSource:
338 modulesOnStream.clear()
339 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
341 if waitTime
is not None:
342 if waitTime > kStallThreshold:
343 t = stalledModules.setdefault(n,[])
345 return stalledModules
350 streamTime = [0]*numStreams
351 streamState = [0]*numStreams
352 moduleTimings = defaultdict(list)
353 modulesActiveOnStream = [defaultdict(int)
for x
in range(numStreams)]
354 for n,trans,s,time,isEvent
in processingSteps:
356 modulesOnStream = modulesActiveOnStream[s]
358 if trans == kStarted:
360 modulesOnStream[n]=time
361 elif trans == kFinished:
362 waitTime = time - modulesOnStream[n]
363 modulesOnStream.pop(n,
None)
365 moduleTimings[n].
append(float(waitTime/1000.))
367 with open(
'module-timings.json',
'w')
as outfile:
368 outfile.write(json.dumps(moduleTimings, indent=4))
372 streamTime = [0]*numStreams
373 streamState = [0]*numStreams
374 modulesActiveOnStreams = [{}
for x
in range(numStreams)]
375 for n,trans,s,time,isEvent
in processingSteps:
377 modulesActiveOnStream = modulesActiveOnStreams[s]
378 if trans == kPrefetchEnd:
379 modulesActiveOnStream[n] = time
381 elif trans == kStartedAcquire
or trans == kStarted:
382 if n
in modulesActiveOnStream:
383 waitTime = time - modulesActiveOnStream[n]
384 modulesActiveOnStream.pop(n,
None)
386 elif trans == kFinishedAcquire
or trans == kFinished:
389 elif trans == kStartedSourceDelayedRead:
390 if streamState[s] == 0:
391 waitTime = time - streamTime[s]
392 elif trans == kStartedSource:
393 modulesActiveOnStream.clear()
394 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
396 states =
"%-*s: " % (maxNameSize,n)
397 if trans == kStartedAcquire
or trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedSource:
401 for index, state
in enumerate(streamState):
402 if n==kSourceFindEvent
and index == s:
405 states +=
str(state)+
" "
406 states +=
" -- " +
str(time/1000.) +
" " +
str(s) +
" "
407 if waitTime
is not None:
408 states +=
" %.2f"% (waitTime/1000.)
409 if waitTime > kStallThreshold:
418 for name,t
in stalledModules.items():
419 maxNameSize =
max(maxNameSize, len(name))
421 priorities.append((name,sum(t),t))
423 priorities.sort(key=
lambda a: a[1], reverse=
True)
425 nameColumn =
"Stalled Module"
426 maxNameSize =
max(maxNameSize, len(nameColumn))
428 stallColumn =
"Tot Stall Time"
429 stallColumnLength = len(stallColumn)
431 print(
"%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times")
432 for n,s,t
in priorities:
433 paddedName =
"%-*s:" % (maxNameSize,n)
434 print(paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t]))
443 return "(x: {}, y: {})".
format(self.
x,self.
y)
453 tmp =
Point(ps[0].x, ps[0].y)
458 reducedPoints.append(tmp)
459 tmp =
Point(p.x, p.y)
460 reducedPoints.append(tmp)
461 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
467 for pairList
in pairLists:
468 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
469 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
470 points.sort(key=attrgetter(
'x'))
482 if len(self.
data) != 0:
483 tmp += self.
data[-1][1]
485 tmp.sort(key=attrgetter(
'x'))
487 self.data.append((graphType, tmp))
504 oldStreamInfo = streamInfo
505 streamInfo = [[]
for x
in range(numStreams)]
507 for s
in range(numStreams):
509 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
510 for info
in oldStreamInfo[s][1:]:
511 start,length,color = info.unpack()
512 if color == lastColor
and lastStartTime+lastTimeLength == start:
513 lastTimeLength += length
516 lastStartTime = start
517 lastTimeLength = length
534 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
535 for start,length,height
in oldBlocks[1:]:
536 if height == lastHeight
and lastStartTime+lastTimeLength == start:
537 lastTimeLength += length
539 blocks.append((lastStartTime,lastTimeLength,lastHeight))
540 lastStartTime = start
541 lastTimeLength = length
543 blocks.append((lastStartTime,lastTimeLength,lastHeight))
549 points = sorted(points, key=attrgetter(
'x'))
553 for t1,t2
in zip(points, points[1:]):
561 if streamHeight < streamHeightCut:
563 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
564 preparedTimes.sort(key=itemgetter(2))
567 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
568 theTS = [(t[0],t[1])
for t
in ts]
570 theTimes = [(t[0]/1000000.,t[1]/1000000.)
for t
in theTS]
571 yspan = (stream-0.4+height,height*(nthreads-1))
572 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
574 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
577 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
579 stalledModuleNames = set([x
for x
in iter(stalledModuleInfo)])
580 streamLowestRow = [[]
for x
in range(numStreams)]
581 modulesActiveOnStreams = [set()
for x
in range(numStreams)]
582 acquireActiveOnStreams = [set()
for x
in range(numStreams)]
583 externalWorkOnStreams = [set()
for x
in range(numStreams)]
584 previousFinishTime = [
None for x
in range(numStreams)]
585 streamRunningTimes = [[]
for x
in range(numStreams)]
586 streamExternalWorkRunningTimes = [[]
for x
in range(numStreams)]
587 maxNumberOfConcurrentModulesOnAStream = 1
588 externalWorkModulesInJob =
False
589 previousTime = [0
for x
in range(numStreams)]
592 finishBeforeStart = [set()
for x
in range(numStreams)]
593 finishAcquireBeforeStart = [set()
for x
in range(numStreams)]
594 countSource = [0
for x
in range(numStreams)]
595 countDelayedSource = [0
for x
in range(numStreams)]
596 countExternalWork = [defaultdict(int)
for x
in range(numStreams)]
599 for n,trans,s,time,isEvent
in processingSteps:
600 if timeOffset
is None:
605 if time < previousTime[s]:
606 time = previousTime[s]
607 previousTime[s] = time
609 activeModules = modulesActiveOnStreams[s]
610 acquireModules = acquireActiveOnStreams[s]
611 externalWorkModules = externalWorkOnStreams[s]
613 if trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedAcquire
or trans == kStartedSource :
620 if trans == kStarted:
621 countExternalWork[s][n] -= 1
622 if n
in finishBeforeStart[s]:
623 finishBeforeStart[s].
remove(n)
625 elif trans == kStartedAcquire:
626 if n
in finishAcquireBeforeStart[s]:
627 finishAcquireBeforeStart[s].
remove(n)
630 if trans == kStartedSourceDelayedRead:
631 countDelayedSource[s] += 1
632 if countDelayedSource[s] < 1:
634 elif trans == kStartedSource:
636 if countSource[s] < 1:
639 moduleNames = activeModules.copy()
640 moduleNames.update(acquireModules)
641 if trans == kStartedAcquire:
642 acquireModules.add(n)
646 if moduleNames
or externalWorkModules:
647 startTime = previousFinishTime[s]
648 previousFinishTime[s] = time
650 if trans == kStarted
and n
in externalWorkModules:
651 externalWorkModules.remove(n)
652 streamExternalWorkRunningTimes[s].
append(
Point(time, -1))
654 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
655 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
656 elif trans == kFinished
or trans == kFinishedSourceDelayedRead
or trans == kFinishedAcquire
or trans == kFinishedSource :
658 if trans == kFinished:
659 if n
not in activeModules:
660 finishBeforeStart[s].
add(n)
663 if trans == kFinishedSourceDelayedRead:
664 countDelayedSource[s] -= 1
665 if countDelayedSource[s] < 0:
667 elif trans == kFinishedSource:
669 if countSource[s] < 0:
672 if trans == kFinishedAcquire:
674 countExternalWork[s][n] += 1
675 if displayExternalWork:
676 externalWorkModulesInJob =
True
677 if (
not checkOrder)
or countExternalWork[s][n] > 0:
678 externalWorkModules.add(n)
679 streamExternalWorkRunningTimes[s].
append(
Point(time,+1))
680 if checkOrder
and n
not in acquireModules:
681 finishAcquireBeforeStart[s].
add(n)
684 startTime = previousFinishTime[s]
685 previousFinishTime[s] = time
686 moduleNames = activeModules.copy()
687 moduleNames.update(acquireModules)
689 if trans == kFinishedAcquire:
690 acquireModules.remove(n)
691 elif trans == kFinishedSourceDelayedRead:
692 if countDelayedSource[s] == 0:
693 activeModules.remove(n)
694 elif trans == kFinishedSource:
695 if countSource[s] == 0:
696 activeModules.remove(n)
698 activeModules.remove(n)
700 if startTime
is not None:
706 elif (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
709 for n
in moduleNames:
710 if n
in stalledModuleNames:
719 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
722 [xH,yH] = fig.get_size_inches()
723 fig.set_size_inches(xH,yH*4/3)
724 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
725 axStack = plt.subplot2grid((4,1),(3,0))
727 ax.set_xlabel(
"Time (sec)")
728 ax.set_ylabel(
"Stream ID")
729 ax.set_ylim(-0.5,numStreams-0.5)
730 ax.yaxis.set_ticks(
range(numStreams))
732 ax.set_xlim((xLower, xUpper))
734 height = 0.8/maxNumberOfConcurrentModulesOnAStream
735 allStackTimes={
'green': [],
'limegreen':[],
'red': [],
'blue': [],
'orange': [],
'darkviolet': []}
736 for iStream,lowestRow
in enumerate(streamLowestRow):
737 times=[(x.begin/1000000., x.delta/1000000.)
for x
in lowestRow]
738 colors=[x.color
for x
in lowestRow]
740 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
743 for info
in lowestRow:
744 if not info.color ==
'darkviolet':
745 allStackTimes[info.color].
append((info.begin, info.delta))
748 if maxNumberOfConcurrentModulesOnAStream > 1
or externalWorkModulesInJob:
750 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
752 perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
753 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
756 allStackTimes, ax, i, height,
759 addToStackTimes=
False,
764 allStackTimes, ax, i, height,
767 addToStackTimes=
True,
772 allStackTimes, ax, i, height,
775 addToStackTimes=
True,
780 print(
"> ... Generating stack")
782 for color
in [
'green',
'limegreen',
'blue',
'red',
'orange',
'darkviolet']:
783 tmp = allStackTimes[color]
785 stack.update(color, tmp)
787 for stk
in reversed(stack.data):
793 for p1,p2
in zip(stk[1], stk[1][1:]):
795 xs.append((p1.x, p2.x-p1.x, height))
796 xs.sort(key = itemgetter(2))
799 for height, xpairs
in groupby(xs, itemgetter(2)):
800 finalxs = [(e[0]/1000000.,e[1]/1000000.)
for e
in xpairs]
802 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
804 axStack.set_xlabel(
"Time (sec)");
805 axStack.set_ylabel(
"# modules");
806 axStack.set_xlim(ax.get_xlim())
807 axStack.tick_params(top=
'off')
809 fig.text(0.1, 0.95,
"modules running event", color =
"green", horizontalalignment =
'left')
810 fig.text(0.1, 0.92,
"modules running other", color =
"limegreen", horizontalalignment =
'left')
811 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
812 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
813 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
814 if displayExternalWork:
815 fig.text(0.9, 0.92,
"external work", color =
"darkviolet", horizontalalignment =
'right')
816 print(
"> ... Saving to file: '{}'".
format(pdfFile))
820 if __name__==
"__main__":
826 parser = argparse.ArgumentParser(description=
'Convert a text file created by cmsRun into a stream stall graph.',
827 formatter_class=argparse.RawDescriptionHelpFormatter,
829 parser.add_argument(
'filename',
830 type=argparse.FileType(
'r'), # open file
831 help='file to process')
832 parser.add_argument(
'-g',
'--graph',
834 metavar=
"'stall.pdf'",
837 help=
'''Create pdf file of stream stall graph. If -g is specified
838 by itself, the default file name is \'stall.pdf\'. Otherwise, the
839 argument to the -g option is the filename.''')
840 parser.add_argument(
'-s',
'--stack',
842 help=
'''Create stack plot, combining all stream-specific info.
843 Can be used only when -g is specified.''')
844 parser.add_argument(
'-e',
'--external',
845 action=
'store_false',
846 help=
'''Suppress display of external work in graphs.''')
847 parser.add_argument(
'-o',
'--order',
849 help=
'''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
850 parser.add_argument(
'-t',
'--timings',
852 help=
'''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
853 parser.add_argument(
'-l',
'--lowerxaxis',
856 help=
'''Lower limit of x axis, default 0, not used if upper limit not set''')
857 parser.add_argument(
'-u',
'--upperxaxis',
859 help=
'''Upper limit of x axis, if not set then x axis limits are set automatically''')
860 args = parser.parse_args()
863 inputFile = args.filename
865 shownStacks = args.stack
866 displayExternalWork = args.external
867 checkOrder = args.order
868 doModuleTimings =
False
870 doModuleTimings =
True
874 if args.upperxaxis
is not None:
876 xUpper = args.upperxaxis
877 xLower = args.lowerxaxis
880 if pdfFile
is not None:
884 matplotlib.use(
"PDF")
885 import matplotlib.pyplot
as plt
886 if not re.match(
r'^[\w\.]+$', pdfFile):
887 print(
"Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile))
888 print(
"Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
892 extension = pdfFile.split(
'.')[-1]
893 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
894 if not extension
in supported_filetypes:
895 print(
"A graph cannot be saved to a filename with extension '{}'.".
format(extension))
896 print(
"The allowed extensions are:")
897 for filetype
in supported_filetypes:
901 if pdfFile
is None and shownStacks:
902 print(
"The -s (--stack) option can be used only when the -g (--graph) option is specified.")
905 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
909 sys.stderr.write(
">processing data\n")
914 sys.stderr.write(
">preparing ASCII art\n")
915 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
917 sys.stderr.write(
">creating PDF\n")
918 createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
921 sys.stderr.write(
">creating module-timings.json\n")
def processingStepsFromStallMonitorOutput
std::pair< unsigned int, unsigned int > unpack(cond::Time_t since)
boost::dynamic_bitset append(const boost::dynamic_bitset<> &bs1, const boost::dynamic_bitset<> &bs2)
this method takes two bitsets bs1 and bs2 and returns result of bs2 appended to the end of bs1 ...
def consolidateContiguousBlocks
const uint16_t range(const Frame &aFrame)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def mergeContiguousBlocks
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
def printStalledModulesInOrder
def plotPerStreamAboveFirstAndPrepareStack