2 from itertools
import groupby
3 from operator
import attrgetter,itemgetter
5 from collections
import defaultdict
10 To Use: Add the StallMonitor Service to the cmsRun job you want to check for 11 stream stalls. Use something like this in the configuration: 13 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log"))) 15 After running the job, execute this script and pass the name of the 16 StallMonitor log file to the script. 18 By default, the script will then print an 'ASCII art' stall graph 19 which consists of a line of text for each time a module or the 20 source stops or starts. Each line contains the name of the module 21 which either started or stopped running, and the number of modules 22 running on each stream at that moment in time. After that will be 23 the time and stream number. Then if a module just started, you 24 will also see the amount of time the module spent between finishing 25 its prefetching and starting. The state of a module is represented 28 plus ("+") the stream has just finished waiting and is starting a module 29 minus ("-") the stream just finished running a module 31 If a module had to wait more than 0.1 seconds, the end of the line 32 will have "STALLED". Startup actions, e.g. reading conditions, 33 may affect results for the first few events. 35 Using the command line arguments described above you can make the 36 program create a PDF file with actual graphs instead of the 'ASCII art' 39 Once the graph is completed, the program outputs the list of modules 40 which had the greatest total stall times. The list is sorted by 41 total stall time and written in descending order. In addition, the 42 list of all stall times for the module is given. 44 There is an inferior alternative (an old obsolete way). 45 Instead of using the StallMonitor Service, you can use the 46 Tracer Service. Make sure to use the 'printTimestamps' option 47 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True)) 48 There are problems associated with this and it is not recommended.''' 62 kStartedSourceDelayedRead=7
63 kFinishedSourceDelayedRead=8
66 kSourceFindEvent =
"sourceFindEvent" 67 kSourceDelayedRead =
"sourceDelayedRead" 77 if not l
or l[0] ==
'#':
78 if len(l) > 5
and l[0:2] ==
"#M":
79 (id,name)=tuple(l[2:].
split())
80 moduleNames[id] = name
82 (step,payload) = tuple(l.split(
None,1))
83 payload=payload.split()
86 if step ==
'E' or step ==
'e':
91 stream =
int(payload[0])
92 time =
int(payload[-1])
97 if step ==
'S' or step ==
's':
98 name = kSourceFindEvent
99 trans = kStartedSource
102 trans = kFinishedSource
105 moduleID = payload[1]
110 if step ==
'p' or step ==
'M' or step ==
'm':
116 name = moduleNames[moduleID]
120 elif step ==
'A' or step ==
'a':
121 trans = kStartedAcquire
123 trans = kFinishedAcquire
124 name = moduleNames[moduleID]
129 elif step ==
'R' or step == 'r': 130 trans = kStartedSourceDelayedRead 132 trans = kFinishedSourceDelayedRead 133 name = kSourceDelayedRead 135 if trans
is not None:
136 numStreams =
max(numStreams, stream+1)
137 maxNameSize =
max(maxNameSize, len(name))
138 processingSteps.append((name,trans,stream,time))
141 return (processingSteps,numStreams,maxNameSize)
146 time = line.split(
" ")[1]
147 time = time.split(
":")
148 time =
int(time[0])*60*60+
int(time[1])*60+
float(time[2])
149 time =
int(1000*time)
180 streamsThatSawFirstEvent = set()
188 if l.find(
"processing event :") != -1:
189 name = kSourceFindEvent
190 trans = kStartedSource
192 if l.find(
"starting:") != -1:
193 trans = kFinishedSource
194 elif l.find(
"processing event for module") != -1:
196 if l.find(
"finished:") != -1:
197 if l.find(
"prefetching") != -1:
202 if l.find(
"prefetching") != -1:
205 name = l.split(
"'")[1]
206 elif l.find(
"processing event acquire for module:") != -1:
207 trans = kStartedAcquire
208 if l.find(
"finished:") != -1:
209 trans = kFinishedAcquire
210 name = l.split(
"'")[1]
211 elif l.find(
"event delayed read from source") != -1:
212 trans = kStartedSourceDelayedRead
213 if l.find(
"finished:") != -1:
214 trans = kFinishedSourceDelayedRead
215 name = kSourceDelayedRead
216 if trans
is not None:
220 time = time - startTime
221 streamIndex = l.find(
"stream = ")
222 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
223 maxNameSize =
max(maxNameSize, len(name))
225 if trans == kFinishedSource
and not stream
in streamsThatSawFirstEvent:
228 processingSteps.append((name,kStartedSource,stream,time))
229 streamsThatSawFirstEvent.add(stream)
231 processingSteps.append((name,trans,stream,time))
232 numStreams =
max(numStreams, stream+1)
235 return (processingSteps,numStreams,maxNameSize)
240 firstLine = inputFile.readline().rstrip()
244 fifthLine = inputFile.readline().rstrip()
247 if firstLine.find(
"# Step") != -1:
248 print "> ... Parsing StallMonitor output." 249 return parseStallMonitorOutput
251 if firstLine.find(
"++") != -1
or fifthLine.find(
"++") != -1:
254 print "> ... Parsing Tracer output." 255 return parseTracerOutput
258 print "Unknown input format." 264 return parseInput(inputFile)
278 streamTime = [0]*numStreams
279 streamState = [0]*numStreams
281 modulesActiveOnStream = [{}
for x
in xrange(numStreams)]
282 for n,trans,s,time
in processingSteps:
285 modulesOnStream = modulesActiveOnStream[s]
286 if trans == kPrefetchEnd:
287 modulesOnStream[n] = time
288 elif trans == kStarted
or trans == kStartedAcquire:
289 if n
in modulesOnStream:
290 waitTime = time - modulesOnStream[n]
291 modulesOnStream.pop(n,
None)
293 elif trans == kFinished
or trans == kFinishedAcquire:
296 elif trans == kStartedSourceDelayedRead:
297 if streamState[s] == 0:
298 waitTime = time - streamTime[s]
299 elif trans == kStartedSource:
300 modulesOnStream.clear()
301 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
303 if waitTime
is not None:
304 if waitTime > kStallThreshold:
305 t = stalledModules.setdefault(n,[])
307 return stalledModules
311 streamTime = [0]*numStreams
312 streamState = [0]*numStreams
313 modulesActiveOnStreams = [{}
for x
in xrange(numStreams)]
314 for n,trans,s,time
in processingSteps:
316 modulesActiveOnStream = modulesActiveOnStreams[s]
317 if trans == kPrefetchEnd:
318 modulesActiveOnStream[n] = time
320 elif trans == kStartedAcquire
or trans == kStarted:
321 if n
in modulesActiveOnStream:
322 waitTime = time - modulesActiveOnStream[n]
323 modulesActiveOnStream.pop(n,
None)
325 elif trans == kFinishedAcquire
or trans == kFinished:
328 elif trans == kStartedSourceDelayedRead:
329 if streamState[s] == 0:
330 waitTime = time - streamTime[s]
331 elif trans == kStartedSource:
332 modulesActiveOnStream.clear()
333 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
335 states =
"%-*s: " % (maxNameSize,n)
336 if trans == kStartedAcquire
or trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedSource:
340 for index, state
in enumerate(streamState):
341 if n==kSourceFindEvent
and index == s:
344 states +=
str(state)+
" " 345 states +=
" -- " +
str(time/1000.) +
" " +
str(s) +
" " 346 if waitTime
is not None:
347 states +=
" %.2f"% (waitTime/1000.)
348 if waitTime > kStallThreshold:
357 for name,t
in stalledModules.iteritems():
358 maxNameSize =
max(maxNameSize, len(name))
360 priorities.append((name,sum(t),t))
363 return cmp(i[1],j[1])
364 priorities.sort(cmp=sumSort, reverse=
True)
366 nameColumn =
"Stalled Module" 367 maxNameSize =
max(maxNameSize, len(nameColumn))
369 stallColumn =
"Tot Stall Time" 370 stallColumnLength = len(stallColumn)
372 print "%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times" 373 for n,s,t
in priorities:
374 paddedName =
"%-*s:" % (maxNameSize,n)
375 print paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t])
384 return "(x: {}, y: {})".
format(self.
x,self.
y)
394 tmp =
Point(ps[0].x, ps[0].y)
399 reducedPoints.append(tmp)
400 tmp =
Point(p.x, p.y)
401 reducedPoints.append(tmp)
402 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
408 for pairList
in pairLists:
409 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
410 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
411 points.sort(key=attrgetter(
'x'))
423 if len(self.
data) != 0:
424 tmp += self.
data[-1][1]
426 tmp.sort(key=attrgetter(
'x'))
428 self.data.append((graphType, tmp))
445 oldStreamInfo = streamInfo
446 streamInfo = [[]
for x
in xrange(numStreams)]
448 for s
in xrange(numStreams):
450 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
451 for info
in oldStreamInfo[s][1:]:
452 start,length,color = info.unpack()
453 if color == lastColor
and lastStartTime+lastTimeLength == start:
454 lastTimeLength += length
457 lastStartTime = start
458 lastTimeLength = length
475 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
476 for start,length,height
in oldBlocks[1:]:
477 if height == lastHeight
and lastStartTime+lastTimeLength == start:
478 lastTimeLength += length
480 blocks.append((lastStartTime,lastTimeLength,lastHeight))
481 lastStartTime = start
482 lastTimeLength = length
484 blocks.append((lastStartTime,lastTimeLength,lastHeight))
490 points = sorted(points, key=attrgetter(
'x'))
494 for t1,t2
in zip(points, points[1:]):
502 if streamHeight < streamHeightCut:
504 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
505 preparedTimes.sort(key=itemgetter(2))
508 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
509 theTS = [(t[0],t[1])
for t
in ts]
511 theTimes = [(t[0]/1000.,t[1]/1000.)
for t
in theTS]
512 yspan = (stream-0.4+height,height*(nthreads-1))
513 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
515 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
518 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
520 stalledModuleNames = set([x
for x
in stalledModuleInfo.iterkeys()])
521 streamLowestRow = [[]
for x
in xrange(numStreams)]
522 modulesActiveOnStreams = [set()
for x
in xrange(numStreams)]
523 acquireActiveOnStreams = [set()
for x
in xrange(numStreams)]
524 externalWorkOnStreams = [set()
for x
in xrange(numStreams)]
525 previousFinishTime = [
None for x
in xrange(numStreams)]
526 streamRunningTimes = [[]
for x
in xrange(numStreams)]
527 streamExternalWorkRunningTimes = [[]
for x
in xrange(numStreams)]
528 maxNumberOfConcurrentModulesOnAStream = 1
529 previousTime = [0
for x
in xrange(numStreams)]
532 finishBeforeStart = [set()
for x
in xrange(numStreams)]
533 finishAcquireBeforeStart = [set()
for x
in xrange(numStreams)]
534 countSource = [0
for x
in xrange(numStreams)]
535 countDelayedSource = [0
for x
in xrange(numStreams)]
536 countExternalWork = [defaultdict(int)
for x
in xrange(numStreams)]
538 for n,trans,s,time
in processingSteps:
543 if time < previousTime[s]:
544 time = previousTime[s]
545 previousTime[s] = time
547 activeModules = modulesActiveOnStreams[s]
548 acquireModules = acquireActiveOnStreams[s]
549 externalWorkModules = externalWorkOnStreams[s]
551 if trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedAcquire
or trans == kStartedSource :
558 if trans == kStarted:
559 countExternalWork[s][n] -= 1
560 if n
in finishBeforeStart[s]:
561 finishBeforeStart[s].
remove(n)
563 elif trans == kStartedAcquire:
564 if n
in finishAcquireBeforeStart[s]:
565 finishAcquireBeforeStart[s].
remove(n)
568 if trans == kStartedSourceDelayedRead:
569 countDelayedSource[s] += 1
570 if countDelayedSource[s] < 1:
572 elif trans == kStartedSource:
574 if countSource[s] < 1:
577 moduleNames = activeModules.copy()
578 moduleNames.update(acquireModules)
579 if trans == kStartedAcquire:
580 acquireModules.add(n)
584 if moduleNames
or externalWorkModules:
585 startTime = previousFinishTime[s]
586 previousFinishTime[s] = time
588 if trans == kStarted
and n
in externalWorkModules:
589 externalWorkModules.remove(n)
590 streamExternalWorkRunningTimes[s].
append(
Point(time, -1))
592 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
593 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
594 elif trans == kFinished
or trans == kFinishedSourceDelayedRead
or trans == kFinishedAcquire
or trans == kFinishedSource :
596 if trans == kFinished:
597 if n
not in activeModules:
598 finishBeforeStart[s].
add(n)
601 if trans == kFinishedSourceDelayedRead:
602 countDelayedSource[s] -= 1
603 if countDelayedSource[s] < 0:
605 elif trans == kFinishedSource:
607 if countSource[s] < 0:
610 if trans == kFinishedAcquire:
612 countExternalWork[s][n] += 1
613 if displayExternalWork:
614 if (
not checkOrder)
or countExternalWork[s][n] > 0:
615 externalWorkModules.add(n)
616 streamExternalWorkRunningTimes[s].
append(
Point(time,+1))
617 if checkOrder
and n
not in acquireModules:
618 finishAcquireBeforeStart[s].
add(n)
621 startTime = previousFinishTime[s]
622 previousFinishTime[s] = time
623 moduleNames = activeModules.copy()
624 moduleNames.update(acquireModules)
626 if trans == kFinishedAcquire:
627 acquireModules.remove(n)
628 elif trans == kFinishedSourceDelayedRead:
629 if countDelayedSource[s] == 0:
630 activeModules.remove(n)
631 elif trans == kFinishedSource:
632 if countSource[s] == 0:
633 activeModules.remove(n)
635 activeModules.remove(n)
637 if startTime
is not None:
641 elif (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
644 for n
in moduleNames:
645 if n
in stalledModuleNames:
654 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
657 [xH,yH] = fig.get_size_inches()
658 fig.set_size_inches(xH,yH*4/3)
659 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
660 axStack = plt.subplot2grid((4,1),(3,0))
662 ax.set_xlabel(
"Time (sec)")
663 ax.set_ylabel(
"Stream ID")
664 ax.set_ylim(-0.5,numStreams-0.5)
665 ax.yaxis.set_ticks(xrange(numStreams))
667 height = 0.8/maxNumberOfConcurrentModulesOnAStream
668 allStackTimes={
'green': [],
'red': [],
'blue': [],
'orange': [],
'darkviolet': []}
669 for iStream,lowestRow
in enumerate(streamLowestRow):
670 times=[(x.begin/1000., x.delta/1000.)
for x
in lowestRow]
671 colors=[x.color
for x
in lowestRow]
673 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
676 for info
in lowestRow:
677 if not info.color ==
'darkviolet':
678 allStackTimes[info.color].
append((info.begin, info.delta))
681 if maxNumberOfConcurrentModulesOnAStream > 1:
683 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
685 perStreamTimesWithExtendedWork =
list(perStreamRunningTimes)
686 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
689 allStackTimes, ax, i, height,
692 addToStackTimes=
False,
697 allStackTimes, ax, i, height,
700 addToStackTimes=
True,
705 allStackTimes, ax, i, height,
708 addToStackTimes=
True,
713 print "> ... Generating stack" 715 for color
in [
'green',
'blue',
'red',
'orange',
'darkviolet']:
716 tmp = allStackTimes[color]
718 stack.update(color, tmp)
720 for stk
in reversed(stack.data):
726 for p1,p2
in zip(stk[1], stk[1][1:]):
728 xs.append((p1.x, p2.x-p1.x, height))
729 xs.sort(key = itemgetter(2))
732 for height, xpairs
in groupby(xs, itemgetter(2)):
733 finalxs = [(e[0]/1000.,e[1]/1000.)
for e
in xpairs]
735 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
737 axStack.set_xlabel(
"Time (sec)");
738 axStack.set_ylabel(
"# threads");
739 axStack.set_xlim(ax.get_xlim())
740 axStack.tick_params(top=
'off')
742 fig.text(0.1, 0.95,
"modules running", color =
"green", horizontalalignment =
'left')
743 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
744 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
745 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
746 if displayExternalWork:
747 fig.text(0.9, 0.92,
"external work", color =
"darkviolet", horizontalalignment =
'right')
748 print "> ... Saving to file: '{}'".
format(pdfFile)
752 if __name__==
"__main__":
758 parser =
argparse.ArgumentParser(description=
'Convert a text file created by cmsRun into a stream stall graph.',
761 parser.add_argument(
'filename',
763 help='file to process')
764 parser.add_argument(
'-g',
'--graph',
766 metavar=
"'stall.pdf'",
769 help=
'''Create pdf file of stream stall graph. If -g is specified 770 by itself, the default file name is \'stall.pdf\'. Otherwise, the 771 argument to the -g option is the filename.''')
772 parser.add_argument(
'-s',
'--stack',
774 help=
'''Create stack plot, combining all stream-specific info. 775 Can be used only when -g is specified.''')
776 parser.add_argument(
'-e',
'--external',
777 action=
'store_false',
778 help=
'''Suppress display of external work in graphs.''')
779 parser.add_argument(
'-o',
'--order',
781 help=
'''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
782 args = parser.parse_args()
785 inputFile = args.filename
787 shownStacks = args.stack
788 displayExternalWork = args.external
789 checkOrder = args.order
792 if pdfFile
is not None:
796 matplotlib.use(
"PDF")
797 import matplotlib.pyplot
as plt
798 if not re.match(
r'^[\w\.]+$', pdfFile):
799 print "Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile)
800 print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed." 804 extension = pdfFile.split(
'.')[-1]
805 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
806 if not extension
in supported_filetypes:
807 print "A graph cannot be saved to a filename with extension '{}'.".
format(extension)
808 print "The allowed extensions are:" 809 for filetype
in supported_filetypes:
810 print " '.{}'".
format(filetype)
813 if pdfFile
is None and shownStacks:
814 print "The -s (--stack) option can be used only when the -g (--graph) option is specified." 817 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
818 processingSteps,numStreams,maxNameSize =
readLogFile(inputFile)
821 sys.stderr.write(
">processing data\n")
824 sys.stderr.write(
">preparing ASCII art\n")
827 sys.stderr.write(
">creating PDF\n")
828 createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModules, displayExternalWork, checkOrder)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def mergeContiguousBlocks(blocks)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def chooseParser(inputFile)
def parseStallMonitorOutput(f)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
def reduceSortedPoints(ps)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
def remove(d, key, TELL=False)
def readLogFile(inputFile)
def __init__(self, x_, y_)
def adjacentDiff(pairLists)
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run