2 from itertools
import groupby
3 from operator
import attrgetter,itemgetter
5 from collections
import defaultdict
10 To Use: Add the StallMonitor Service to the cmsRun job you want to check for 11 stream stalls. Use something like this in the configuration: 13 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log"))) 15 After running the job, execute this script and pass the name of the 16 StallMonitor log file to the script. 18 By default, the script will then print an 'ASCII art' stall graph 19 which consists of a line of text for each time a module or the 20 source stops or starts. Each line contains the name of the module 21 which either started or stopped running, and the number of modules 22 running on each stream at that moment in time. After that will be 23 the time and stream number. Then if a module just started, you 24 will also see the amount of time the module spent between finishing 25 its prefetching and starting. The state of a module is represented 28 plus ("+") the stream has just finished waiting and is starting a module 29 minus ("-") the stream just finished running a module 31 If a module had to wait more than 0.1 seconds, the end of the line 32 will have "STALLED". Startup actions, e.g. reading conditions, 33 may affect results for the first few events. 35 Using the command line arguments described above you can make the 36 program create a PDF file with actual graphs instead of the 'ASCII art' 39 Once the graph is completed, the program outputs the list of modules 40 which had the greatest total stall times. The list is sorted by 41 total stall time and written in descending order. In addition, the 42 list of all stall times for the module is given. 44 There is an inferior alternative (an old obsolete way). 45 Instead of using the StallMonitor Service, you can use the 46 Tracer Service. Make sure to use the 'printTimestamps' option 47 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True)) 48 There are problems associated with this and it is not recommended.''' 62 kStartedSourceDelayedRead=7
63 kFinishedSourceDelayedRead=8
66 kSourceFindEvent =
"sourceFindEvent" 67 kSourceDelayedRead =
"sourceDelayedRead" 73 if not l
or l[0] ==
'#':
75 (step,payload) = tuple(l.split(
None,1))
76 payload=payload.split()
79 if step ==
'E' or step ==
'e':
84 stream =
int(payload[0])
85 time =
int(payload[-1])
92 if step ==
'S' or step ==
's':
93 name = kSourceFindEvent
94 trans = kStartedSource
97 trans = kFinishedSource
100 moduleID = payload[1]
105 if step ==
'p' or step ==
'M' or step ==
'm':
111 if step ==
'm' or step ==
'M':
112 isEvent = (
int(payload[2]) == 0)
113 name = moduleNames[moduleID]
117 elif step ==
'A' or step ==
'a':
118 trans = kStartedAcquire
120 trans = kFinishedAcquire
121 name = moduleNames[moduleID]
126 elif step ==
'R' or step == 'r': 127 trans = kStartedSourceDelayedRead 129 trans = kFinishedSourceDelayedRead 130 name = kSourceDelayedRead 132 if trans
is not None:
133 yield (name,trans,stream,time, isEvent)
143 if l
and l[0] ==
'M':
147 numStreams =
int(i[1])+1
149 if len(l) > 5
and l[0:2] ==
"#M":
150 (id,name)=tuple(l[2:].
split())
151 moduleNames[id] = name
157 for n
in moduleNames.iteritems():
162 """Create a generator which can step through the file and return each processing step. 163 Using a generator reduces the memory overhead when parsing a large file. 171 time = line.split(
" ")[1]
172 time = time.split(
":")
173 time =
int(time[0])*60*60+
int(time[1])*60+
float(time[2])
174 time =
int(1000*time)
205 streamsThatSawFirstEvent = set()
213 if l.find(
"processing event :") != -1:
214 name = kSourceFindEvent
215 trans = kStartedSource
217 if l.find(
"starting:") != -1:
218 trans = kFinishedSource
219 elif l.find(
"processing event for module") != -1:
221 if l.find(
"finished:") != -1:
222 if l.find(
"prefetching") != -1:
227 if l.find(
"prefetching") != -1:
230 name = l.split(
"'")[1]
231 elif l.find(
"processing event acquire for module:") != -1:
232 trans = kStartedAcquire
233 if l.find(
"finished:") != -1:
234 trans = kFinishedAcquire
235 name = l.split(
"'")[1]
236 elif l.find(
"event delayed read from source") != -1:
237 trans = kStartedSourceDelayedRead
238 if l.find(
"finished:") != -1:
239 trans = kFinishedSourceDelayedRead
240 name = kSourceDelayedRead
241 if trans
is not None:
245 time = time - startTime
246 streamIndex = l.find(
"stream = ")
247 stream =
int(l[streamIndex+9:l.find(
" ",streamIndex+10)])
248 maxNameSize =
max(maxNameSize, len(name))
250 if trans == kFinishedSource
and not stream
in streamsThatSawFirstEvent:
253 processingSteps.append((name,kStartedSource,stream,time,
True))
254 streamsThatSawFirstEvent.add(stream)
256 processingSteps.append((name,trans,stream,time,
True))
257 numStreams =
max(numStreams, stream+1)
260 return (processingSteps,numStreams,maxNameSize)
266 return self._processingSteps
271 firstLine = inputFile.readline().rstrip()
275 fifthLine = inputFile.readline().rstrip()
277 if (firstLine.find(
"# Transition") != -1)
or (firstLine.find(
"# Step") != -1):
278 print "> ... Parsing StallMonitor output." 279 return StallMonitorParser
281 if firstLine.find(
"++") != -1
or fifthLine.find(
"++") != -1:
284 print "> ... Parsing Tracer output." 288 print "Unknown input format." 294 return parseInput(inputFile)
308 streamTime = [0]*numStreams
309 streamState = [0]*numStreams
311 modulesActiveOnStream = [{}
for x
in xrange(numStreams)]
312 for n,trans,s,time,isEvent
in processingSteps:
315 modulesOnStream = modulesActiveOnStream[s]
316 if trans == kPrefetchEnd:
317 modulesOnStream[n] = time
318 elif trans == kStarted
or trans == kStartedAcquire:
319 if n
in modulesOnStream:
320 waitTime = time - modulesOnStream[n]
321 modulesOnStream.pop(n,
None)
323 elif trans == kFinished
or trans == kFinishedAcquire:
326 elif trans == kStartedSourceDelayedRead:
327 if streamState[s] == 0:
328 waitTime = time - streamTime[s]
329 elif trans == kStartedSource:
330 modulesOnStream.clear()
331 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
333 if waitTime
is not None:
334 if waitTime > kStallThreshold:
335 t = stalledModules.setdefault(n,[])
337 return stalledModules
341 streamTime = [0]*numStreams
342 streamState = [0]*numStreams
343 modulesActiveOnStreams = [{}
for x
in xrange(numStreams)]
344 for n,trans,s,time,isEvent
in processingSteps:
346 modulesActiveOnStream = modulesActiveOnStreams[s]
347 if trans == kPrefetchEnd:
348 modulesActiveOnStream[n] = time
350 elif trans == kStartedAcquire
or trans == kStarted:
351 if n
in modulesActiveOnStream:
352 waitTime = time - modulesActiveOnStream[n]
353 modulesActiveOnStream.pop(n,
None)
355 elif trans == kFinishedAcquire
or trans == kFinished:
358 elif trans == kStartedSourceDelayedRead:
359 if streamState[s] == 0:
360 waitTime = time - streamTime[s]
361 elif trans == kStartedSource:
362 modulesActiveOnStream.clear()
363 elif trans == kFinishedSource
or trans == kFinishedSourceDelayedRead:
365 states =
"%-*s: " % (maxNameSize,n)
366 if trans == kStartedAcquire
or trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedSource:
370 for index, state
in enumerate(streamState):
371 if n==kSourceFindEvent
and index == s:
374 states +=
str(state)+
" " 375 states +=
" -- " +
str(time/1000.) +
" " +
str(s) +
" " 376 if waitTime
is not None:
377 states +=
" %.2f"% (waitTime/1000.)
378 if waitTime > kStallThreshold:
387 for name,t
in stalledModules.iteritems():
388 maxNameSize =
max(maxNameSize, len(name))
390 priorities.append((name,sum(t),t))
393 return cmp(i[1],j[1])
394 priorities.sort(cmp=sumSort, reverse=
True)
396 nameColumn =
"Stalled Module" 397 maxNameSize =
max(maxNameSize, len(nameColumn))
399 stallColumn =
"Tot Stall Time" 400 stallColumnLength = len(stallColumn)
402 print "%-*s" % (maxNameSize, nameColumn),
"%-*s"%(stallColumnLength,stallColumn),
" Stall Times" 403 for n,s,t
in priorities:
404 paddedName =
"%-*s:" % (maxNameSize,n)
405 print paddedName,
"%-*.2f"%(stallColumnLength,s/1000.),
", ".
join([
"%.2f"%(x/1000.)
for x
in t])
414 return "(x: {}, y: {})".
format(self.
x,self.
y)
424 tmp =
Point(ps[0].x, ps[0].y)
429 reducedPoints.append(tmp)
430 tmp =
Point(p.x, p.y)
431 reducedPoints.append(tmp)
432 reducedPoints = [p
for p
in reducedPoints
if p.y != 0]
438 for pairList
in pairLists:
439 points += [
Point(x[0], 1)
for x
in pairList
if x[1] != 0]
440 points += [
Point(sum(x),-1)
for x
in pairList
if x[1] != 0]
441 points.sort(key=attrgetter(
'x'))
453 if len(self.
data) != 0:
454 tmp += self.
data[-1][1]
456 tmp.sort(key=attrgetter(
'x'))
458 self.data.append((graphType, tmp))
475 oldStreamInfo = streamInfo
476 streamInfo = [[]
for x
in xrange(numStreams)]
478 for s
in xrange(numStreams):
480 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].
unpack()
481 for info
in oldStreamInfo[s][1:]:
482 start,length,color = info.unpack()
483 if color == lastColor
and lastStartTime+lastTimeLength == start:
484 lastTimeLength += length
487 lastStartTime = start
488 lastTimeLength = length
505 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
506 for start,length,height
in oldBlocks[1:]:
507 if height == lastHeight
and lastStartTime+lastTimeLength == start:
508 lastTimeLength += length
510 blocks.append((lastStartTime,lastTimeLength,lastHeight))
511 lastStartTime = start
512 lastTimeLength = length
514 blocks.append((lastStartTime,lastTimeLength,lastHeight))
520 points = sorted(points, key=attrgetter(
'x'))
524 for t1,t2
in zip(points, points[1:]):
532 if streamHeight < streamHeightCut:
534 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
535 preparedTimes.sort(key=itemgetter(2))
538 for nthreads, ts
in groupby(preparedTimes, itemgetter(2)):
539 theTS = [(t[0],t[1])
for t
in ts]
541 theTimes = [(t[0]/1000.,t[1]/1000.)
for t
in theTS]
542 yspan = (stream-0.4+height,height*(nthreads-1))
543 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
545 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
548 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
550 stalledModuleNames = set([x
for x
in stalledModuleInfo.iterkeys()])
551 streamLowestRow = [[]
for x
in xrange(numStreams)]
552 modulesActiveOnStreams = [set()
for x
in xrange(numStreams)]
553 acquireActiveOnStreams = [set()
for x
in xrange(numStreams)]
554 externalWorkOnStreams = [set()
for x
in xrange(numStreams)]
555 previousFinishTime = [
None for x
in xrange(numStreams)]
556 streamRunningTimes = [[]
for x
in xrange(numStreams)]
557 streamExternalWorkRunningTimes = [[]
for x
in xrange(numStreams)]
558 maxNumberOfConcurrentModulesOnAStream = 1
559 previousTime = [0
for x
in xrange(numStreams)]
562 finishBeforeStart = [set()
for x
in xrange(numStreams)]
563 finishAcquireBeforeStart = [set()
for x
in xrange(numStreams)]
564 countSource = [0
for x
in xrange(numStreams)]
565 countDelayedSource = [0
for x
in xrange(numStreams)]
566 countExternalWork = [defaultdict(int)
for x
in xrange(numStreams)]
569 for n,trans,s,time,isEvent
in processingSteps:
570 if timeOffset
is None:
575 if time < previousTime[s]:
576 time = previousTime[s]
577 previousTime[s] = time
579 activeModules = modulesActiveOnStreams[s]
580 acquireModules = acquireActiveOnStreams[s]
581 externalWorkModules = externalWorkOnStreams[s]
583 if trans == kStarted
or trans == kStartedSourceDelayedRead
or trans == kStartedAcquire
or trans == kStartedSource :
590 if trans == kStarted:
591 countExternalWork[s][n] -= 1
592 if n
in finishBeforeStart[s]:
593 finishBeforeStart[s].
remove(n)
595 elif trans == kStartedAcquire:
596 if n
in finishAcquireBeforeStart[s]:
597 finishAcquireBeforeStart[s].
remove(n)
600 if trans == kStartedSourceDelayedRead:
601 countDelayedSource[s] += 1
602 if countDelayedSource[s] < 1:
604 elif trans == kStartedSource:
606 if countSource[s] < 1:
609 moduleNames = activeModules.copy()
610 moduleNames.update(acquireModules)
611 if trans == kStartedAcquire:
612 acquireModules.add(n)
616 if moduleNames
or externalWorkModules:
617 startTime = previousFinishTime[s]
618 previousFinishTime[s] = time
620 if trans == kStarted
and n
in externalWorkModules:
621 externalWorkModules.remove(n)
622 streamExternalWorkRunningTimes[s].
append(
Point(time, -1))
624 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
625 maxNumberOfConcurrentModulesOnAStream =
max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
626 elif trans == kFinished
or trans == kFinishedSourceDelayedRead
or trans == kFinishedAcquire
or trans == kFinishedSource :
628 if trans == kFinished:
629 if n
not in activeModules:
630 finishBeforeStart[s].
add(n)
633 if trans == kFinishedSourceDelayedRead:
634 countDelayedSource[s] -= 1
635 if countDelayedSource[s] < 0:
637 elif trans == kFinishedSource:
639 if countSource[s] < 0:
642 if trans == kFinishedAcquire:
644 countExternalWork[s][n] += 1
645 if displayExternalWork:
646 if (
not checkOrder)
or countExternalWork[s][n] > 0:
647 externalWorkModules.add(n)
648 streamExternalWorkRunningTimes[s].
append(
Point(time,+1))
649 if checkOrder
and n
not in acquireModules:
650 finishAcquireBeforeStart[s].
add(n)
653 startTime = previousFinishTime[s]
654 previousFinishTime[s] = time
655 moduleNames = activeModules.copy()
656 moduleNames.update(acquireModules)
658 if trans == kFinishedAcquire:
659 acquireModules.remove(n)
660 elif trans == kFinishedSourceDelayedRead:
661 if countDelayedSource[s] == 0:
662 activeModules.remove(n)
663 elif trans == kFinishedSource:
664 if countSource[s] == 0:
665 activeModules.remove(n)
667 activeModules.remove(n)
669 if startTime
is not None:
675 elif (kSourceDelayedRead
in moduleNames)
or (kSourceFindEvent
in moduleNames):
678 for n
in moduleNames:
679 if n
in stalledModuleNames:
688 fig, ax = plt.subplots(nrows=nr, squeeze=
True)
691 [xH,yH] = fig.get_size_inches()
692 fig.set_size_inches(xH,yH*4/3)
693 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
694 axStack = plt.subplot2grid((4,1),(3,0))
696 ax.set_xlabel(
"Time (sec)")
697 ax.set_ylabel(
"Stream ID")
698 ax.set_ylim(-0.5,numStreams-0.5)
699 ax.yaxis.set_ticks(xrange(numStreams))
701 height = 0.8/maxNumberOfConcurrentModulesOnAStream
702 allStackTimes={
'green': [],
'limegreen':[],
'red': [],
'blue': [],
'orange': [],
'darkviolet': []}
703 for iStream,lowestRow
in enumerate(streamLowestRow):
704 times=[(x.begin/1000., x.delta/1000.)
for x
in lowestRow]
705 colors=[x.color
for x
in lowestRow]
707 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
710 for info
in lowestRow:
711 if not info.color ==
'darkviolet':
712 allStackTimes[info.color].
append((info.begin, info.delta))
715 if maxNumberOfConcurrentModulesOnAStream > 1:
717 for i,perStreamRunningTimes
in enumerate(streamRunningTimes):
719 perStreamTimesWithExtendedWork =
list(perStreamRunningTimes)
720 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
723 allStackTimes, ax, i, height,
726 addToStackTimes=
False,
731 allStackTimes, ax, i, height,
734 addToStackTimes=
True,
739 allStackTimes, ax, i, height,
742 addToStackTimes=
True,
747 print "> ... Generating stack" 749 for color
in [
'green',
'limegreen',
'blue',
'red',
'orange',
'darkviolet']:
750 tmp = allStackTimes[color]
752 stack.update(color, tmp)
754 for stk
in reversed(stack.data):
760 for p1,p2
in zip(stk[1], stk[1][1:]):
762 xs.append((p1.x, p2.x-p1.x, height))
763 xs.sort(key = itemgetter(2))
766 for height, xpairs
in groupby(xs, itemgetter(2)):
767 finalxs = [(e[0]/1000.,e[1]/1000.)
for e
in xpairs]
769 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
771 axStack.set_xlabel(
"Time (sec)");
772 axStack.set_ylabel(
"# threads");
773 axStack.set_xlim(ax.get_xlim())
774 axStack.tick_params(top=
'off')
776 fig.text(0.1, 0.95,
"modules running event", color =
"green", horizontalalignment =
'left')
777 fig.text(0.1, 0.92,
"modules running other", color =
"limegreen", horizontalalignment =
'left')
778 fig.text(0.5, 0.95,
"stalled module running", color =
"red", horizontalalignment =
'center')
779 fig.text(0.9, 0.95,
"read from input", color =
"orange", horizontalalignment =
'right')
780 fig.text(0.5, 0.92,
"multiple modules running", color =
"blue", horizontalalignment =
'center')
781 if displayExternalWork:
782 fig.text(0.9, 0.92,
"external work", color =
"darkviolet", horizontalalignment =
'right')
783 print "> ... Saving to file: '{}'".
format(pdfFile)
787 if __name__==
"__main__":
793 parser =
argparse.ArgumentParser(description=
'Convert a text file created by cmsRun into a stream stall graph.',
796 parser.add_argument(
'filename',
798 help='file to process')
799 parser.add_argument(
'-g',
'--graph',
801 metavar=
"'stall.pdf'",
804 help=
'''Create pdf file of stream stall graph. If -g is specified 805 by itself, the default file name is \'stall.pdf\'. Otherwise, the 806 argument to the -g option is the filename.''')
807 parser.add_argument(
'-s',
'--stack',
809 help=
'''Create stack plot, combining all stream-specific info. 810 Can be used only when -g is specified.''')
811 parser.add_argument(
'-e',
'--external',
812 action=
'store_false',
813 help=
'''Suppress display of external work in graphs.''')
814 parser.add_argument(
'-o',
'--order',
816 help=
'''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
817 args = parser.parse_args()
820 inputFile = args.filename
822 shownStacks = args.stack
823 displayExternalWork = args.external
824 checkOrder = args.order
827 if pdfFile
is not None:
831 matplotlib.use(
"PDF")
832 import matplotlib.pyplot
as plt
833 if not re.match(
r'^[\w\.]+$', pdfFile):
834 print "Malformed file name '{}' supplied with the '-g' option.".
format(pdfFile)
835 print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed." 839 extension = pdfFile.split(
'.')[-1]
840 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
841 if not extension
in supported_filetypes:
842 print "A graph cannot be saved to a filename with extension '{}'.".
format(extension)
843 print "The allowed extensions are:" 844 for filetype
in supported_filetypes:
845 print " '.{}'".
format(filetype)
848 if pdfFile
is None and shownStacks:
849 print "The -s (--stack) option can be used only when the -g (--graph) option is specified." 852 sys.stderr.write(
">reading file: '{}'\n".
format(inputFile.name))
856 sys.stderr.write(
">processing data\n")
860 sys.stderr.write(
">preparing ASCII art\n")
861 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
863 sys.stderr.write(
">creating PDF\n")
864 createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def mergeContiguousBlocks(blocks)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def chooseParser(inputFile)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
def reduceSortedPoints(ps)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
def processingSteps(self)
def processingStepsFromStallMonitorOutput(f, moduleNames)
def processingSteps(self)
def remove(d, key, TELL=False)
def readLogFile(inputFile)
def __init__(self, x_, y_)
def adjacentDiff(pairLists)
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run