2 from itertools
import groupby
3 from operator
import attrgetter,itemgetter
5 from collections
import defaultdict
11 To Use: Add the StallMonitor Service to the cmsRun job you want to check for 12 stream stalls. Use something like this in the configuration: 14 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log"))) 16 After running the job, execute this script and pass the name of the 17 StallMonitor log file to the script. 19 By default, the script will then print an 'ASCII art' stall graph 20 which consists of a line of text for each time a module or the 21 source stops or starts. Each line contains the name of the module 22 which either started or stopped running, and the number of modules 23 running on each stream at that moment in time. After that will be 24 the time and stream number. Then if a module just started, you 25 will also see the amount of time the module spent between finishing 26 its prefetching and starting. The state of a module is represented 29 plus ("+") the stream has just finished waiting and is starting a module 30 minus ("-") the stream just finished running a module 32 If a module had to wait more than 0.1 seconds, the end of the line 33 will have "STALLED". Startup actions, e.g. reading conditions, 34 may affect results for the first few events. 36 Using the command line arguments described above you can make the 37 program create a PDF file with actual graphs instead of the 'ASCII art' 40 Once the graph is completed, the program outputs the list of modules 41 which had the greatest total stall times. The list is sorted by 42 total stall time and written in descending order. In addition, the 43 list of all stall times for the module is given. 45 There is an inferior alternative (an old obsolete way). 46 Instead of using the StallMonitor Service, you can use the 47 Tracer Service. Make sure to use the 'printTimestamps' option 48 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True)) 49 There are problems associated with this and it is not recommended.''' 63 kStartedSourceDelayedRead=7
64 kFinishedSourceDelayedRead=8
67 kSourceFindEvent =
"sourceFindEvent" 68 kSourceDelayedRead =
"sourceDelayedRead" 74 if not l
or l[0] ==
'#':
76 (step,payload) = tuple(l.split(
None,1))
77 payload=payload.split()
80 if step ==
'E' or step ==
'e':
85 stream =
int(payload[0])
86 time =
int(payload[-1])
93 if step ==
'S' or step ==
's':
94 name = kSourceFindEvent
95 trans = kStartedSource
98 trans = kFinishedSource
101 moduleID = payload[1]
106 if step ==
'p' or step ==
'M' or step ==
'm':
112 if step ==
'm' or step ==
'M':
113 isEvent = (
int(payload[2]) == 0)
114 name = moduleNames[moduleID]
118 elif step ==
'A' or step ==
'a':
119 trans = kStartedAcquire
121 trans = kFinishedAcquire
122 name = moduleNames[moduleID]
127 elif step ==
'R' or step == 'r': 128 trans = kStartedSourceDelayedRead 130 trans = kFinishedSourceDelayedRead 131 name = kSourceDelayedRead 133 if trans
is not None:
134 yield (name,trans,stream,time, isEvent)
141 numStreamsFromSource = 0
145 if l
and l[0] ==
'M':
149 numStreams =
int(i[1])+1
151 if numStreams == 0
and l
and l[0] ==
'S':
152 s =
int(l.split(
' ')[1])
153 if s > numStreamsFromSource:
154 numStreamsFromSource = s
155 if len(l) > 5
and l[0:2] ==
"#M":
156 (id,name)=tuple(l[2:].
split())
157 moduleNames[id] = name
161 numStreams = numStreamsFromSource +1
165 for n
in six.iteritems(moduleNames):
170 """Create a generator which can step through the file and return each processing step. 171 Using a generator reduces the memory overhead when parsing a large file. 179 time = line.split(
" ")[1]
180 time = time.split(
":")
181 time =
int(time[0])*60*60+
int(time[1])*60+
float(time[2])
182 time =
int(1000*time)
213 streamsThatSawFirstEvent = set()
221 if l.find(
"processing event :") != -1:
222 name = kSourceFindEvent
223 trans = kStartedSource
225 if l.find(
"starting:") != -1:
226 trans = kFinishedSource
227 elif l.find(
"processing event for module") != -1:
229 if l.find(
"finished:") != -1:
230 if l.find(
"prefetching") != -1:
235 if l.find(
"prefetching") != -1:
238 name = l.split(
"'")[1]
239 elif l.find(
"processing event acquire for module:") != -1:
240 trans = kStartedAcquire
241 if l.find(
"finished:") != -1:
242 trans = kFinishedAcquire
243 name = l.split(
"'")[1]
244 elif l.find(
"event delayed read from source") != -1:
245 trans = kStartedSourceDelayedRead
246 if l.find(
"finished:") != -1:
247 trans = kFinishedSourceDelayedRead
248 name = kSourceDelayedRead
249 if trans
is not None:
253 time = time - startTime
254 streamIndex = l.find(
"stream = ")
255 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
256 maxNameSize =
max(maxNameSize, len(name))
258 if trans == kFinishedSource
and not stream
in streamsThatSawFirstEvent:
261 processingSteps.append((name,kStartedSource,stream,time,
True))
262 streamsThatSawFirstEvent.add(stream)
264 processingSteps.append((name,trans,stream,time,
True))
265 numStreams =
max(numStreams, stream+1)
268 return (processingSteps,numStreams,maxNameSize)
274 return self._processingSteps
279 firstLine = inputFile.readline().rstrip()
283 fifthLine = inputFile.readline().rstrip()
285 if (firstLine.find(
"# Transition") != -1)
or (firstLine.find(
"# Step") != -1):
286 print "> ... Parsing StallMonitor output." 287 return StallMonitorParser
289 if firstLine.find(
"++") != -1
or fifthLine.find(
"++") != -1:
292 print "> ... Parsing Tracer output." 296 print "Unknown input format." 302 return parseInput(inputFile)
316 streamTime = [0]*numStreams
317 streamState = [0]*numStreams
319 modulesActiveOnStream = [{}
for x
in xrange(numStreams)]
320 for n,trans,s,time,isEvent
in processingSteps:
323 modulesOnStream = modulesActiveOnStream[s]
324 if trans == kPrefetchEnd:
325 modulesOnStream[n] = time
326 elif trans == kStarted
or trans == kStartedAcquire:
327 if n
in modulesOnStream:
328 waitTime = time - modulesOnStream[n]
329 modulesOnStream.pop(n,
None)
331 elif trans == kFinished
or trans == kFinishedAcquire:
334 elif trans == kStartedSourceDelayedRead:
335 if streamState[s] == 0:
336 waitTime = time - streamTime[s]
337 elif trans == kStartedSource:
338 modulesOnStream.clear()
339 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
341 if waitTime
is not None:
342 if waitTime > kStallThreshold:
343 t = stalledModules.setdefault(n,[])
345 return stalledModules
349 streamTime = [0]*numStreams
350 streamState = [0]*numStreams
351 modulesActiveOnStreams = [{}
for x
in xrange(numStreams)]
352 for n,trans,s,time,isEvent
in processingSteps:
354 modulesActiveOnStream = modulesActiveOnStreams[s]
355 if trans == kPrefetchEnd:
356 modulesActiveOnStream[n] = time
358 elif trans == kStartedAcquire
or trans == kStarted:
359 if n
in modulesActiveOnStream:
360 waitTime = time - modulesActiveOnStream[n]
361 modulesActiveOnStream.pop(n,
None)
363 elif trans == kFinishedAcquire
or trans == kFinished:
366 elif trans == kStartedSourceDelayedRead:
367 if streamState[s] == 0:
368 waitTime = time - streamTime[s]
369 elif trans == kStartedSource:
370 modulesActiveOnStream.clear()
371 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
373 states =
"%-*s: " % (maxNameSize,n)
374 if trans == kStartedAcquire
or trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedSource:
378 for index, state
in enumerate(streamState):
379 if n==kSourceFindEvent
and index == s:
382 states +=
str(state)+
" " 383 states +=
" -- " +
str(time/1000.) +
" " +
str(s) +
" " 384 if waitTime
is not None:
385 states +=
" %.2f"% (waitTime/1000.)
386 if waitTime > kStallThreshold:
395 for name,t
in six.iteritems(stalledModules):
396 maxNameSize =
max(maxNameSize, len(name))
398 priorities.append((name,sum(t),t))
401 return cmp(i[1],j[1])
402 priorities.sort(cmp=sumSort, reverse=
True)
404 nameColumn =
"Stalled Module" 405 maxNameSize =
max(maxNameSize, len(nameColumn))
407 stallColumn =
"Tot Stall Time" 408 stallColumnLength = len(stallColumn)
410 print "%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times" 411 for n,s,t
in priorities:
412 paddedName =
"%-*s:" % (maxNameSize,n)
413 print paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t])
422 return "(x: {}, y: {})".
format(self.
x,self.
y)
432 tmp =
Point(ps[0].x, ps[0].y)
437 reducedPoints.append(tmp)
438 tmp =
Point(p.x, p.y)
439 reducedPoints.append(tmp)
440 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
446 for pairList
in pairLists:
447 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
448 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
449 points.sort(key=attrgetter(
'x'))
461 if len(self.
data) != 0:
462 tmp += self.
data[-1][1]
464 tmp.sort(key=attrgetter(
'x'))
466 self.data.append((graphType, tmp))
483 oldStreamInfo = streamInfo
484 streamInfo = [[]
for x
in xrange(numStreams)]
486 for s
in xrange(numStreams):
488 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
489 for info
in oldStreamInfo[s][1:]:
490 start,length,color = info.unpack()
491 if color == lastColor
and lastStartTime+lastTimeLength == start:
492 lastTimeLength += length
495 lastStartTime = start
496 lastTimeLength = length
513 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
514 for start,length,height
in oldBlocks[1:]:
515 if height == lastHeight
and lastStartTime+lastTimeLength == start:
516 lastTimeLength += length
518 blocks.append((lastStartTime,lastTimeLength,lastHeight))
519 lastStartTime = start
520 lastTimeLength = length
522 blocks.append((lastStartTime,lastTimeLength,lastHeight))
528 points = sorted(points, key=attrgetter(
'x'))
532 for t1,t2
in zip(points, points[1:]):
540 if streamHeight < streamHeightCut:
542 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
543 preparedTimes.sort(key=itemgetter(2))
546 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
547 theTS = [(t[0],t[1])
for t
in ts]
549 theTimes = [(t[0]/1000.,t[1]/1000.)
for t
in theTS]
550 yspan = (stream-0.4+height,height*(nthreads-1))
551 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
553 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
556 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
558 stalledModuleNames = set([x
for x
in stalledModuleInfo.iterkeys()])
559 streamLowestRow = [[]
for x
in xrange(numStreams)]
560 modulesActiveOnStreams = [set()
for x
in xrange(numStreams)]
561 acquireActiveOnStreams = [set()
for x
in xrange(numStreams)]
562 externalWorkOnStreams = [set()
for x
in xrange(numStreams)]
563 previousFinishTime = [
None for x
in xrange(numStreams)]
564 streamRunningTimes = [[]
for x
in xrange(numStreams)]
565 streamExternalWorkRunningTimes = [[]
for x
in xrange(numStreams)]
566 maxNumberOfConcurrentModulesOnAStream = 1
567 externalWorkModulesInJob =
False 568 previousTime = [0
for x
in xrange(numStreams)]
571 finishBeforeStart = [set()
for x
in xrange(numStreams)]
572 finishAcquireBeforeStart = [set()
for x
in xrange(numStreams)]
573 countSource = [0
for x
in xrange(numStreams)]
574 countDelayedSource = [0
for x
in xrange(numStreams)]
575 countExternalWork = [defaultdict(int)
for x
in xrange(numStreams)]
578 for n,trans,s,time,isEvent
in processingSteps:
579 if timeOffset
is None:
584 if time < previousTime[s]:
585 time = previousTime[s]
586 previousTime[s] = time
588 activeModules = modulesActiveOnStreams[s]
589 acquireModules = acquireActiveOnStreams[s]
590 externalWorkModules = externalWorkOnStreams[s]
592 if trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedAcquire
or trans == kStartedSource :
599 if trans == kStarted:
600 countExternalWork[s][n] -= 1
601 if n
in finishBeforeStart[s]:
602 finishBeforeStart[s].
remove(n)
604 elif trans == kStartedAcquire:
605 if n
in finishAcquireBeforeStart[s]:
606 finishAcquireBeforeStart[s].
remove(n)
609 if trans == kStartedSourceDelayedRead:
610 countDelayedSource[s] += 1
611 if countDelayedSource[s] < 1:
613 elif trans == kStartedSource:
615 if countSource[s] < 1:
618 moduleNames = activeModules.copy()
619 moduleNames.update(acquireModules)
620 if trans == kStartedAcquire:
621 acquireModules.add(n)
625 if moduleNames
or externalWorkModules:
626 startTime = previousFinishTime[s]
627 previousFinishTime[s] = time
629 if trans == kStarted
and n
in externalWorkModules:
630 externalWorkModules.remove(n)
631 streamExternalWorkRunningTimes[s].
append(
Point(time, -1))
633 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
634 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
635 elif trans == kFinished
or trans == kFinishedSourceDelayedRead
or trans == kFinishedAcquire
or trans == kFinishedSource :
637 if trans == kFinished:
638 if n
not in activeModules:
639 finishBeforeStart[s].
add(n)
642 if trans == kFinishedSourceDelayedRead:
643 countDelayedSource[s] -= 1
644 if countDelayedSource[s] < 0:
646 elif trans == kFinishedSource:
648 if countSource[s] < 0:
651 if trans == kFinishedAcquire:
653 countExternalWork[s][n] += 1
654 if displayExternalWork:
655 externalWorkModulesInJob =
True 656 if (
not checkOrder)
or countExternalWork[s][n] > 0:
657 externalWorkModules.add(n)
658 streamExternalWorkRunningTimes[s].
append(
Point(time,+1))
659 if checkOrder
and n
not in acquireModules:
660 finishAcquireBeforeStart[s].
add(n)
663 startTime = previousFinishTime[s]
664 previousFinishTime[s] = time
665 moduleNames = activeModules.copy()
666 moduleNames.update(acquireModules)
668 if trans == kFinishedAcquire:
669 acquireModules.remove(n)
670 elif trans == kFinishedSourceDelayedRead:
671 if countDelayedSource[s] == 0:
672 activeModules.remove(n)
673 elif trans == kFinishedSource:
674 if countSource[s] == 0:
675 activeModules.remove(n)
677 activeModules.remove(n)
679 if startTime
is not None:
685 elif (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
688 for n
in moduleNames:
689 if n
in stalledModuleNames:
698 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
701 [xH,yH] = fig.get_size_inches()
702 fig.set_size_inches(xH,yH*4/3)
703 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
704 axStack = plt.subplot2grid((4,1),(3,0))
706 ax.set_xlabel(
"Time (sec)")
707 ax.set_ylabel(
"Stream ID")
708 ax.set_ylim(-0.5,numStreams-0.5)
709 ax.yaxis.set_ticks(xrange(numStreams))
711 height = 0.8/maxNumberOfConcurrentModulesOnAStream
712 allStackTimes={
'green': [],
'limegreen':[],
'red': [],
'blue': [],
'orange': [],
'darkviolet': []}
713 for iStream,lowestRow
in enumerate(streamLowestRow):
714 times=[(x.begin/1000., x.delta/1000.)
for x
in lowestRow]
715 colors=[x.color
for x
in lowestRow]
717 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
720 for info
in lowestRow:
721 if not info.color ==
'darkviolet':
722 allStackTimes[info.color].
append((info.begin, info.delta))
725 if maxNumberOfConcurrentModulesOnAStream > 1
or externalWorkModulesInJob:
727 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
729 perStreamTimesWithExtendedWork =
list(perStreamRunningTimes)
730 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
733 allStackTimes, ax, i, height,
736 addToStackTimes=
False,
741 allStackTimes, ax, i, height,
744 addToStackTimes=
True,
749 allStackTimes, ax, i, height,
752 addToStackTimes=
True,
757 print "> ... Generating stack" 759 for color
in [
'green',
'limegreen',
'blue',
'red',
'orange',
'darkviolet']:
760 tmp = allStackTimes[color]
762 stack.update(color, tmp)
764 for stk
in reversed(stack.data):
770 for p1,p2
in zip(stk[1], stk[1][1:]):
772 xs.append((p1.x, p2.x-p1.x, height))
773 xs.sort(key = itemgetter(2))
776 for height, xpairs
in groupby(xs, itemgetter(2)):
777 finalxs = [(e[0]/1000.,e[1]/1000.)
for e
in xpairs]
779 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
781 axStack.set_xlabel(
"Time (sec)");
782 axStack.set_ylabel(
"# modules");
783 axStack.set_xlim(ax.get_xlim())
784 axStack.tick_params(top=
'off')
786 fig.text(0.1, 0.95,
"modules running event", color =
"green", horizontalalignment =
'left')
787 fig.text(0.1, 0.92,
"modules running other", color =
"limegreen", horizontalalignment =
'left')
788 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
789 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
790 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
791 if displayExternalWork:
792 fig.text(0.9, 0.92,
"external work", color =
"darkviolet", horizontalalignment =
'right')
793 print "> ... Saving to file: '{}'".
format(pdfFile)
797 if __name__==
"__main__":
803 parser = argparse.ArgumentParser(description=
'Convert a text file created by cmsRun into a stream stall graph.',
804 formatter_class=argparse.RawDescriptionHelpFormatter,
806 parser.add_argument(
'filename',
807 type=argparse.FileType(
'r'), # open file 808 help='file to process')
809 parser.add_argument(
'-g',
'--graph',
811 metavar=
"'stall.pdf'",
814 help=
'''Create pdf file of stream stall graph. If -g is specified 815 by itself, the default file name is \'stall.pdf\'. Otherwise, the 816 argument to the -g option is the filename.''')
817 parser.add_argument(
'-s',
'--stack',
819 help=
'''Create stack plot, combining all stream-specific info. 820 Can be used only when -g is specified.''')
821 parser.add_argument(
'-e',
'--external',
822 action=
'store_false',
823 help=
'''Suppress display of external work in graphs.''')
824 parser.add_argument(
'-o',
'--order',
826 help=
'''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
827 args = parser.parse_args()
830 inputFile = args.filename
832 shownStacks = args.stack
833 displayExternalWork = args.external
834 checkOrder = args.order
837 if pdfFile
is not None:
841 matplotlib.use(
"PDF")
842 import matplotlib.pyplot
as plt
843 if not re.match(
r'^[\w\.]+$', pdfFile):
844 print "Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile)
845 print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed." 849 extension = pdfFile.split(
'.')[-1]
850 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
851 if not extension
in supported_filetypes:
852 print "A graph cannot be saved to a filename with extension '{}'.".
format(extension)
853 print "The allowed extensions are:" 854 for filetype
in supported_filetypes:
855 print " '.{}'".
format(filetype)
858 if pdfFile
is None and shownStacks:
859 print "The -s (--stack) option can be used only when the -g (--graph) option is specified." 862 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
866 sys.stderr.write(
">processing data\n")
870 sys.stderr.write(
">preparing ASCII art\n")
871 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
873 sys.stderr.write(
">creating PDF\n")
874 createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def mergeContiguousBlocks(blocks)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def chooseParser(inputFile)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
def reduceSortedPoints(ps)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
def processingSteps(self)
def processingStepsFromStallMonitorOutput(f, moduleNames)
def processingSteps(self)
def remove(d, key, TELL=False)
def readLogFile(inputFile)
def __init__(self, x_, y_)
def adjacentDiff(pairLists)
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run