2 from __future__
import print_function
3 from itertools
import groupby
4 from operator
import attrgetter,itemgetter
6 from collections
import defaultdict
12 To Use: Add the StallMonitor Service to the cmsRun job you want to check for 13 stream stalls. Use something like this in the configuration: 15 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log"))) 17 After running the job, execute this script and pass the name of the 18 StallMonitor log file to the script. 20 By default, the script will then print an 'ASCII art' stall graph 21 which consists of a line of text for each time a module or the 22 source stops or starts. Each line contains the name of the module 23 which either started or stopped running, and the number of modules 24 running on each stream at that moment in time. After that will be 25 the time and stream number. Then if a module just started, you 26 will also see the amount of time the module spent between finishing 27 its prefetching and starting. The state of a module is represented 30 plus ("+") the stream has just finished waiting and is starting a module 31 minus ("-") the stream just finished running a module 33 If a module had to wait more than 0.1 seconds, the end of the line 34 will have "STALLED". Startup actions, e.g. reading conditions, 35 may affect results for the first few events. 37 Using the command line arguments described above you can make the 38 program create a PDF file with actual graphs instead of the 'ASCII art' 41 Once the graph is completed, the program outputs the list of modules 42 which had the greatest total stall times. The list is sorted by 43 total stall time and written in descending order. In addition, the 44 list of all stall times for the module is given. 46 There is an inferior alternative (an old obsolete way). 47 Instead of using the StallMonitor Service, you can use the 48 Tracer Service. Make sure to use the 'printTimestamps' option 49 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True)) 50 There are problems associated with this and it is not recommended.''' 64 kStartedSourceDelayedRead=7
65 kFinishedSourceDelayedRead=8
68 kSourceFindEvent =
"sourceFindEvent" 69 kSourceDelayedRead =
"sourceDelayedRead" 75 if not l
or l[0] ==
'#':
77 (step,payload) = tuple(l.split(
None,1))
78 payload=payload.split()
81 if step ==
'E' or step ==
'e':
86 stream =
int(payload[0])
87 time =
int(payload[-1])
94 if step ==
'S' or step ==
's':
95 name = kSourceFindEvent
96 trans = kStartedSource
99 trans = kFinishedSource
102 moduleID = payload[1]
107 if step ==
'p' or step ==
'M' or step ==
'm':
113 if step ==
'm' or step ==
'M':
114 isEvent = (
int(payload[2]) == 0)
115 name = moduleNames[moduleID]
119 elif step ==
'A' or step ==
'a':
120 trans = kStartedAcquire
122 trans = kFinishedAcquire
123 name = moduleNames[moduleID]
128 elif step ==
'R' or step == 'r': 129 trans = kStartedSourceDelayedRead 131 trans = kFinishedSourceDelayedRead 132 name = kSourceDelayedRead 134 if trans
is not None:
135 yield (name,trans,stream,time, isEvent)
142 numStreamsFromSource = 0
146 if l
and l[0] ==
'M':
150 numStreams =
int(i[1])+1
152 if numStreams == 0
and l
and l[0] ==
'S':
153 s =
int(l.split(
' ')[1])
154 if s > numStreamsFromSource:
155 numStreamsFromSource = s
156 if len(l) > 5
and l[0:2] ==
"#M":
157 (id,name)=tuple(l[2:].
split())
158 moduleNames[id] = name
162 numStreams = numStreamsFromSource +1
166 for n
in six.iteritems(moduleNames):
171 """Create a generator which can step through the file and return each processing step. 172 Using a generator reduces the memory overhead when parsing a large file. 180 time = line.split(
" ")[1]
181 time = time.split(
":")
182 time =
int(time[0])*60*60+
int(time[1])*60+
float(time[2])
183 time =
int(1000*time)
214 streamsThatSawFirstEvent = set()
222 if l.find(
"processing event :") != -1:
223 name = kSourceFindEvent
224 trans = kStartedSource
226 if l.find(
"starting:") != -1:
227 trans = kFinishedSource
228 elif l.find(
"processing event for module") != -1:
230 if l.find(
"finished:") != -1:
231 if l.find(
"prefetching") != -1:
236 if l.find(
"prefetching") != -1:
239 name = l.split(
"'")[1]
240 elif l.find(
"processing event acquire for module:") != -1:
241 trans = kStartedAcquire
242 if l.find(
"finished:") != -1:
243 trans = kFinishedAcquire
244 name = l.split(
"'")[1]
245 elif l.find(
"event delayed read from source") != -1:
246 trans = kStartedSourceDelayedRead
247 if l.find(
"finished:") != -1:
248 trans = kFinishedSourceDelayedRead
249 name = kSourceDelayedRead
250 if trans
is not None:
254 time = time - startTime
255 streamIndex = l.find(
"stream = ")
256 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
257 maxNameSize =
max(maxNameSize, len(name))
259 if trans == kFinishedSource
and not stream
in streamsThatSawFirstEvent:
262 processingSteps.append((name,kStartedSource,stream,time,
True))
263 streamsThatSawFirstEvent.add(stream)
265 processingSteps.append((name,trans,stream,time,
True))
266 numStreams =
max(numStreams, stream+1)
269 return (processingSteps,numStreams,maxNameSize)
275 return self._processingSteps
280 firstLine = inputFile.readline().rstrip()
284 fifthLine = inputFile.readline().rstrip()
286 if (firstLine.find(
"# Transition") != -1)
or (firstLine.find(
"# Step") != -1):
287 print(
"> ... Parsing StallMonitor output.")
288 return StallMonitorParser
290 if firstLine.find(
"++") != -1
or fifthLine.find(
"++") != -1:
293 print(
"> ... Parsing Tracer output.")
297 print(
"Unknown input format.")
303 return parseInput(inputFile)
317 streamTime = [0]*numStreams
318 streamState = [0]*numStreams
320 modulesActiveOnStream = [{}
for x
in xrange(numStreams)]
321 for n,trans,s,time,isEvent
in processingSteps:
324 modulesOnStream = modulesActiveOnStream[s]
325 if trans == kPrefetchEnd:
326 modulesOnStream[n] = time
327 elif trans == kStarted
or trans == kStartedAcquire:
328 if n
in modulesOnStream:
329 waitTime = time - modulesOnStream[n]
330 modulesOnStream.pop(n,
None)
332 elif trans == kFinished
or trans == kFinishedAcquire:
335 elif trans == kStartedSourceDelayedRead:
336 if streamState[s] == 0:
337 waitTime = time - streamTime[s]
338 elif trans == kStartedSource:
339 modulesOnStream.clear()
340 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
342 if waitTime
is not None:
343 if waitTime > kStallThreshold:
344 t = stalledModules.setdefault(n,[])
346 return stalledModules
350 streamTime = [0]*numStreams
351 streamState = [0]*numStreams
352 modulesActiveOnStreams = [{}
for x
in xrange(numStreams)]
353 for n,trans,s,time,isEvent
in processingSteps:
355 modulesActiveOnStream = modulesActiveOnStreams[s]
356 if trans == kPrefetchEnd:
357 modulesActiveOnStream[n] = time
359 elif trans == kStartedAcquire
or trans == kStarted:
360 if n
in modulesActiveOnStream:
361 waitTime = time - modulesActiveOnStream[n]
362 modulesActiveOnStream.pop(n,
None)
364 elif trans == kFinishedAcquire
or trans == kFinished:
367 elif trans == kStartedSourceDelayedRead:
368 if streamState[s] == 0:
369 waitTime = time - streamTime[s]
370 elif trans == kStartedSource:
371 modulesActiveOnStream.clear()
372 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
374 states =
"%-*s: " % (maxNameSize,n)
375 if trans == kStartedAcquire
or trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedSource:
379 for index, state
in enumerate(streamState):
380 if n==kSourceFindEvent
and index == s:
383 states +=
str(state)+
" " 384 states +=
" -- " +
str(time/1000.) +
" " +
str(s) +
" " 385 if waitTime
is not None:
386 states +=
" %.2f"% (waitTime/1000.)
387 if waitTime > kStallThreshold:
396 for name,t
in six.iteritems(stalledModules):
397 maxNameSize =
max(maxNameSize, len(name))
399 priorities.append((name,sum(t),t))
402 return cmp(i[1],j[1])
403 priorities.sort(cmp=sumSort, reverse=
True)
405 nameColumn =
"Stalled Module" 406 maxNameSize =
max(maxNameSize, len(nameColumn))
408 stallColumn =
"Tot Stall Time" 409 stallColumnLength = len(stallColumn)
411 print(
"%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times")
412 for n,s,t
in priorities:
413 paddedName =
"%-*s:" % (maxNameSize,n)
414 print(paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t]))
423 return "(x: {}, y: {})".
format(self.
x,self.
y)
433 tmp =
Point(ps[0].x, ps[0].y)
438 reducedPoints.append(tmp)
439 tmp =
Point(p.x, p.y)
440 reducedPoints.append(tmp)
441 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
447 for pairList
in pairLists:
448 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
449 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
450 points.sort(key=attrgetter(
'x'))
462 if len(self.
data) != 0:
463 tmp += self.
data[-1][1]
465 tmp.sort(key=attrgetter(
'x'))
467 self.data.append((graphType, tmp))
484 oldStreamInfo = streamInfo
485 streamInfo = [[]
for x
in xrange(numStreams)]
487 for s
in xrange(numStreams):
489 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
490 for info
in oldStreamInfo[s][1:]:
491 start,length,color = info.unpack()
492 if color == lastColor
and lastStartTime+lastTimeLength == start:
493 lastTimeLength += length
496 lastStartTime = start
497 lastTimeLength = length
514 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
515 for start,length,height
in oldBlocks[1:]:
516 if height == lastHeight
and lastStartTime+lastTimeLength == start:
517 lastTimeLength += length
519 blocks.append((lastStartTime,lastTimeLength,lastHeight))
520 lastStartTime = start
521 lastTimeLength = length
523 blocks.append((lastStartTime,lastTimeLength,lastHeight))
529 points = sorted(points, key=attrgetter(
'x'))
533 for t1,t2
in zip(points, points[1:]):
541 if streamHeight < streamHeightCut:
543 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
544 preparedTimes.sort(key=itemgetter(2))
547 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
548 theTS = [(t[0],t[1])
for t
in ts]
550 theTimes = [(t[0]/1000.,t[1]/1000.)
for t
in theTS]
551 yspan = (stream-0.4+height,height*(nthreads-1))
552 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
554 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
557 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
559 stalledModuleNames = set([x
for x
in stalledModuleInfo.iterkeys()])
560 streamLowestRow = [[]
for x
in xrange(numStreams)]
561 modulesActiveOnStreams = [set()
for x
in xrange(numStreams)]
562 acquireActiveOnStreams = [set()
for x
in xrange(numStreams)]
563 externalWorkOnStreams = [set()
for x
in xrange(numStreams)]
564 previousFinishTime = [
None for x
in xrange(numStreams)]
565 streamRunningTimes = [[]
for x
in xrange(numStreams)]
566 streamExternalWorkRunningTimes = [[]
for x
in xrange(numStreams)]
567 maxNumberOfConcurrentModulesOnAStream = 1
568 externalWorkModulesInJob =
False 569 previousTime = [0
for x
in xrange(numStreams)]
572 finishBeforeStart = [set()
for x
in xrange(numStreams)]
573 finishAcquireBeforeStart = [set()
for x
in xrange(numStreams)]
574 countSource = [0
for x
in xrange(numStreams)]
575 countDelayedSource = [0
for x
in xrange(numStreams)]
576 countExternalWork = [defaultdict(int)
for x
in xrange(numStreams)]
579 for n,trans,s,time,isEvent
in processingSteps:
580 if timeOffset
is None:
585 if time < previousTime[s]:
586 time = previousTime[s]
587 previousTime[s] = time
589 activeModules = modulesActiveOnStreams[s]
590 acquireModules = acquireActiveOnStreams[s]
591 externalWorkModules = externalWorkOnStreams[s]
593 if trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedAcquire
or trans == kStartedSource :
600 if trans == kStarted:
601 countExternalWork[s][n] -= 1
602 if n
in finishBeforeStart[s]:
603 finishBeforeStart[s].
remove(n)
605 elif trans == kStartedAcquire:
606 if n
in finishAcquireBeforeStart[s]:
607 finishAcquireBeforeStart[s].
remove(n)
610 if trans == kStartedSourceDelayedRead:
611 countDelayedSource[s] += 1
612 if countDelayedSource[s] < 1:
614 elif trans == kStartedSource:
616 if countSource[s] < 1:
619 moduleNames = activeModules.copy()
620 moduleNames.update(acquireModules)
621 if trans == kStartedAcquire:
622 acquireModules.add(n)
626 if moduleNames
or externalWorkModules:
627 startTime = previousFinishTime[s]
628 previousFinishTime[s] = time
630 if trans == kStarted
and n
in externalWorkModules:
631 externalWorkModules.remove(n)
632 streamExternalWorkRunningTimes[s].
append(
Point(time, -1))
634 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
635 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
636 elif trans == kFinished
or trans == kFinishedSourceDelayedRead
or trans == kFinishedAcquire
or trans == kFinishedSource :
638 if trans == kFinished:
639 if n
not in activeModules:
640 finishBeforeStart[s].
add(n)
643 if trans == kFinishedSourceDelayedRead:
644 countDelayedSource[s] -= 1
645 if countDelayedSource[s] < 0:
647 elif trans == kFinishedSource:
649 if countSource[s] < 0:
652 if trans == kFinishedAcquire:
654 countExternalWork[s][n] += 1
655 if displayExternalWork:
656 externalWorkModulesInJob =
True 657 if (
not checkOrder)
or countExternalWork[s][n] > 0:
658 externalWorkModules.add(n)
659 streamExternalWorkRunningTimes[s].
append(
Point(time,+1))
660 if checkOrder
and n
not in acquireModules:
661 finishAcquireBeforeStart[s].
add(n)
664 startTime = previousFinishTime[s]
665 previousFinishTime[s] = time
666 moduleNames = activeModules.copy()
667 moduleNames.update(acquireModules)
669 if trans == kFinishedAcquire:
670 acquireModules.remove(n)
671 elif trans == kFinishedSourceDelayedRead:
672 if countDelayedSource[s] == 0:
673 activeModules.remove(n)
674 elif trans == kFinishedSource:
675 if countSource[s] == 0:
676 activeModules.remove(n)
678 activeModules.remove(n)
680 if startTime
is not None:
686 elif (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
689 for n
in moduleNames:
690 if n
in stalledModuleNames:
699 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
702 [xH,yH] = fig.get_size_inches()
703 fig.set_size_inches(xH,yH*4/3)
704 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
705 axStack = plt.subplot2grid((4,1),(3,0))
707 ax.set_xlabel(
"Time (sec)")
708 ax.set_ylabel(
"Stream ID")
709 ax.set_ylim(-0.5,numStreams-0.5)
710 ax.yaxis.set_ticks(xrange(numStreams))
712 height = 0.8/maxNumberOfConcurrentModulesOnAStream
713 allStackTimes={
'green': [],
'limegreen':[],
'red': [],
'blue': [],
'orange': [],
'darkviolet': []}
714 for iStream,lowestRow
in enumerate(streamLowestRow):
715 times=[(x.begin/1000., x.delta/1000.)
for x
in lowestRow]
716 colors=[x.color
for x
in lowestRow]
718 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
721 for info
in lowestRow:
722 if not info.color ==
'darkviolet':
723 allStackTimes[info.color].
append((info.begin, info.delta))
726 if maxNumberOfConcurrentModulesOnAStream > 1
or externalWorkModulesInJob:
728 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
730 perStreamTimesWithExtendedWork =
list(perStreamRunningTimes)
731 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
734 allStackTimes, ax, i, height,
737 addToStackTimes=
False,
742 allStackTimes, ax, i, height,
745 addToStackTimes=
True,
750 allStackTimes, ax, i, height,
753 addToStackTimes=
True,
758 print(
"> ... Generating stack")
760 for color
in [
'green',
'limegreen',
'blue',
'red',
'orange',
'darkviolet']:
761 tmp = allStackTimes[color]
763 stack.update(color, tmp)
765 for stk
in reversed(stack.data):
771 for p1,p2
in zip(stk[1], stk[1][1:]):
773 xs.append((p1.x, p2.x-p1.x, height))
774 xs.sort(key = itemgetter(2))
777 for height, xpairs
in groupby(xs, itemgetter(2)):
778 finalxs = [(e[0]/1000.,e[1]/1000.)
for e
in xpairs]
780 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
782 axStack.set_xlabel(
"Time (sec)");
783 axStack.set_ylabel(
"# modules");
784 axStack.set_xlim(ax.get_xlim())
785 axStack.tick_params(top=
'off')
787 fig.text(0.1, 0.95,
"modules running event", color =
"green", horizontalalignment =
'left')
788 fig.text(0.1, 0.92,
"modules running other", color =
"limegreen", horizontalalignment =
'left')
789 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
790 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
791 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
792 if displayExternalWork:
793 fig.text(0.9, 0.92,
"external work", color =
"darkviolet", horizontalalignment =
'right')
794 print(
"> ... Saving to file: '{}'".
format(pdfFile))
798 if __name__==
"__main__":
804 parser = argparse.ArgumentParser(description=
'Convert a text file created by cmsRun into a stream stall graph.',
805 formatter_class=argparse.RawDescriptionHelpFormatter,
807 parser.add_argument(
'filename',
808 type=argparse.FileType(
'r'), # open file 809 help='file to process')
810 parser.add_argument(
'-g',
'--graph',
812 metavar=
"'stall.pdf'",
815 help=
'''Create pdf file of stream stall graph. If -g is specified 816 by itself, the default file name is \'stall.pdf\'. Otherwise, the 817 argument to the -g option is the filename.''')
818 parser.add_argument(
'-s',
'--stack',
820 help=
'''Create stack plot, combining all stream-specific info. 821 Can be used only when -g is specified.''')
822 parser.add_argument(
'-e',
'--external',
823 action=
'store_false',
824 help=
'''Suppress display of external work in graphs.''')
825 parser.add_argument(
'-o',
'--order',
827 help=
'''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
828 args = parser.parse_args()
831 inputFile = args.filename
833 shownStacks = args.stack
834 displayExternalWork = args.external
835 checkOrder = args.order
838 if pdfFile
is not None:
842 matplotlib.use(
"PDF")
843 import matplotlib.pyplot
as plt
844 if not re.match(
r'^[\w\.]+$', pdfFile):
845 print(
"Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile))
846 print(
"Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
850 extension = pdfFile.split(
'.')[-1]
851 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
852 if not extension
in supported_filetypes:
853 print(
"A graph cannot be saved to a filename with extension '{}'.".
format(extension))
854 print(
"The allowed extensions are:")
855 for filetype
in supported_filetypes:
859 if pdfFile
is None and shownStacks:
860 print(
"The -s (--stack) option can be used only when the -g (--graph) option is specified.")
863 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
867 sys.stderr.write(
">processing data\n")
871 sys.stderr.write(
">preparing ASCII art\n")
872 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
874 sys.stderr.write(
">creating PDF\n")
875 createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def mergeContiguousBlocks(blocks)
def consolidateContiguousBlocks(numStreams, streamInfo)
S & print(S &os, JobReport::InputFile const &f)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def chooseParser(inputFile)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
def reduceSortedPoints(ps)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
def processingSteps(self)
def processingStepsFromStallMonitorOutput(f, moduleNames)
def processingSteps(self)
def remove(d, key, TELL=False)
def readLogFile(inputFile)
def __init__(self, x_, y_)
def adjacentDiff(pairLists)
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run