CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from itertools import groupby
3 from operator import attrgetter,itemgetter
4 import sys
5 from collections import defaultdict
6 
7 #----------------------------------------------
8 def printHelp():
9  s = '''
10 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
11  stream stalls. Use something like this in the configuration:
12 
13  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
14 
15  After running the job, execute this script and pass the name of the
16  StallMonitor log file to the script.
17 
18  By default, the script will then print an 'ASCII art' stall graph
19  which consists of a line of text for each time a module or the
20  source stops or starts. Each line contains the name of the module
21  which either started or stopped running, and the number of modules
22  running on each stream at that moment in time. After that will be
23  the time and stream number. Then if a module just started, you
24  will also see the amount of time the module spent between finishing
25  its prefetching and starting. The state of a module is represented
26  by a symbol:
27 
28  plus ("+") the stream has just finished waiting and is starting a module
29  minus ("-") the stream just finished running a module
30 
31  If a module had to wait more than 0.1 seconds, the end of the line
32  will have "STALLED". Startup actions, e.g. reading conditions,
33  may affect results for the first few events.
34 
35  Using the command line arguments described above you can make the
36  program create a PDF file with actual graphs instead of the 'ASCII art'
37  output.
38 
39  Once the graph is completed, the program outputs the list of modules
40  which had the greatest total stall times. The list is sorted by
41  total stall time and written in descending order. In addition, the
42  list of all stall times for the module is given.
43 
44  There is an inferior alternative (an old obsolete way).
45  Instead of using the StallMonitor Service, you can use the
46  Tracer Service. Make sure to use the 'printTimestamps' option
47  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
48  There are problems associated with this and it is not recommended.'''
49  return s
50 
51 kStallThreshold=100 #in milliseconds
52 kTracerInput=False
53 
54 #Stream states
55 kStarted=0
56 kFinished=1
57 kPrefetchEnd=2
58 kStartedAcquire=3
59 kFinishedAcquire=4
60 kStartedSource=5
61 kFinishedSource=6
62 kStartedSourceDelayedRead=7
63 kFinishedSourceDelayedRead=8
64 
65 #Special names
66 kSourceFindEvent = "sourceFindEvent"
67 kSourceDelayedRead ="sourceDelayedRead"
68 
69 #----------------------------------------------
71  for rawl in f:
72  l = rawl.strip()
73  if not l or l[0] == '#':
74  continue
75  (step,payload) = tuple(l.split(None,1))
76  payload=payload.split()
77 
78  # Ignore these
79  if step == 'E' or step == 'e':
80  continue
81 
82  # Payload format is:
83  # <stream id> <..other fields..> <time since begin job>
84  stream = int(payload[0])
85  time = int(payload[-1])
86  trans = None
87  isEvent = True
88 
89  name = None
90  # 'S' = begin of event creation in source
91  # 's' = end of event creation in source
92  if step == 'S' or step == 's':
93  name = kSourceFindEvent
94  trans = kStartedSource
95  # The start of an event is the end of the framework part
96  if step == 's':
97  trans = kFinishedSource
98  else:
99  # moduleID is the second payload argument for all steps below
100  moduleID = payload[1]
101 
102  # 'p' = end of module prefetching
103  # 'M' = begin of module processing
104  # 'm' = end of module processing
105  if step == 'p' or step == 'M' or step == 'm':
106  trans = kStarted
107  if step == 'p':
108  trans = kPrefetchEnd
109  elif step == 'm':
110  trans = kFinished
111  if step == 'm' or step == 'M':
112  isEvent = (int(payload[2]) == 0)
113  name = moduleNames[moduleID]
114 
115  # 'A' = begin of module acquire function
116  # 'a' = end of module acquire function
117  elif step == 'A' or step == 'a':
118  trans = kStartedAcquire
119  if step == 'a':
120  trans = kFinishedAcquire
121  name = moduleNames[moduleID]
122 
123  # Delayed read from source
124  # 'R' = begin of delayed read from source
125  # 'r' = end of delayed read from source
126  elif step == 'R' or step == 'r':
127  trans = kStartedSourceDelayedRead
128  if step == 'r':
129  trans = kFinishedSourceDelayedRead
130  name = kSourceDelayedRead
131 
132  if trans is not None:
133  yield (name,trans,stream,time, isEvent)
134 
135  return
136 
138  def __init__(self,f):
139  numStreams = 0
140  moduleNames = {}
141  for rawl in f:
142  l = rawl.strip()
143  if l and l[0] == 'M':
144  i = l.split(' ')
145  if i[3] == '4':
146  #found global begin run
147  numStreams = int(i[1])+1
148  break
149  if len(l) > 5 and l[0:2] == "#M":
150  (id,name)=tuple(l[2:].split())
151  moduleNames[id] = name
152  continue
153  self._f = f
154  self.numStreams =numStreams
155  self._moduleNames = moduleNames
156  self.maxNameSize =0
157  for n in moduleNames.iteritems():
158  self.maxNameSize = max(self.maxNameSize,len(n))
159  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
160 
161  def processingSteps(self):
162  """Create a generator which can step through the file and return each processing step.
163  Using a generator reduces the memory overhead when parsing a large file.
164  """
165  self._f.seek(0)
167 
168 #----------------------------------------------
169 # Utility to get time out of Tracer output text format
170 def getTime(line):
171  time = line.split(" ")[1]
172  time = time.split(":")
173  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
174  time = int(1000*time) # convert to milliseconds
175  return time
176 
177 #----------------------------------------------
178 # The next function parses the Tracer output.
179 # Here are some differences to consider if you use Tracer output
180 # instead of the StallMonitor output.
181 # - The time in the text of the Tracer output is not as precise
182 # as the StallMonitor (.01 s vs .001 s)
183 # - The MessageLogger bases the time on when the message printed
184 # and not when it was initially queued up to print which smears
185 # the accuracy of the times.
186 # - Both of the previous things can produce some strange effects
187 # in the output plots.
188 # - The file size of the Tracer text file is much larger.
189 # - The CPU work needed to parse the Tracer files is larger.
190 # - The Tracer log file is expected to have "++" in the first
191 # or fifth line. If there are extraneous lines at the beginning
192 # you have to remove them.
193 # - The ascii printout out will have one extraneous line
194 # near the end for the SourceFindEvent start.
195 # - The only advantage I can see is that you have only
196 # one output file to handle instead of two, the regular
197 # log file and the StallMonitor output.
198 # We might should just delete the Tracer option because it is
199 # clearly inferior ...
201  processingSteps = []
202  numStreams = 0
203  maxNameSize = 0
204  startTime = 0
205  streamsThatSawFirstEvent = set()
206  for l in f:
207  trans = None
208  # We estimate the start and stop of the source
209  # by the end of the previous event and start of
210  # the event. This is historical, probably because
211  # the Tracer output for the begin and end of the
212  # source event does not include the stream number.
213  if l.find("processing event :") != -1:
214  name = kSourceFindEvent
215  trans = kStartedSource
216  # the end of the source is estimated using the start of the event
217  if l.find("starting:") != -1:
218  trans = kFinishedSource
219  elif l.find("processing event for module") != -1:
220  trans = kStarted
221  if l.find("finished:") != -1:
222  if l.find("prefetching") != -1:
223  trans = kPrefetchEnd
224  else:
225  trans = kFinished
226  else:
227  if l.find("prefetching") != -1:
228  #skip this since we don't care about prefetch starts
229  continue
230  name = l.split("'")[1]
231  elif l.find("processing event acquire for module:") != -1:
232  trans = kStartedAcquire
233  if l.find("finished:") != -1:
234  trans = kFinishedAcquire
235  name = l.split("'")[1]
236  elif l.find("event delayed read from source") != -1:
237  trans = kStartedSourceDelayedRead
238  if l.find("finished:") != -1:
239  trans = kFinishedSourceDelayedRead
240  name = kSourceDelayedRead
241  if trans is not None:
242  time = getTime(l)
243  if startTime == 0:
244  startTime = time
245  time = time - startTime
246  streamIndex = l.find("stream = ")
247  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
248  maxNameSize = max(maxNameSize, len(name))
249 
250  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
251  # This is wrong but there is no way to estimate the time better
252  # because there is no previous event for the first event.
253  processingSteps.append((name,kStartedSource,stream,time,True))
254  streamsThatSawFirstEvent.add(stream)
255 
256  processingSteps.append((name,trans,stream,time, True))
257  numStreams = max(numStreams, stream+1)
258 
259  f.close()
260  return (processingSteps,numStreams,maxNameSize)
261 
263  def __init__(self,f):
264  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
265  def processingSteps(self):
266  return self._processingSteps
267 
268 #----------------------------------------------
269 def chooseParser(inputFile):
270 
271  firstLine = inputFile.readline().rstrip()
272  for i in range(3):
273  inputFile.readline()
274  # Often the Tracer log file starts with 4 lines not from the Tracer
275  fifthLine = inputFile.readline().rstrip()
276  inputFile.seek(0) # Rewind back to beginning
277  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
278  print "> ... Parsing StallMonitor output."
279  return StallMonitorParser
280 
281  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
282  global kTracerInput
283  kTracerInput = True
284  print "> ... Parsing Tracer output."
285  return TracerParser
286  else:
287  inputFile.close()
288  print "Unknown input format."
289  exit(1)
290 
291 #----------------------------------------------
292 def readLogFile(inputFile):
293  parseInput = chooseParser(inputFile)
294  return parseInput(inputFile)
295 
296 #----------------------------------------------
297 #
298 # modules: The time between prefetch finished and 'start processing' is
299 # the time it took to acquire any resources which is by definition the
300 # stall time.
301 #
302 # source: The source just records how long it spent doing work,
303 # not how long it was stalled. We can get a lower bound on the stall
304 # time for delayed reads by measuring the time the stream was doing
305 # no work up till the start of the source delayed read.
306 #
307 def findStalledModules(processingSteps, numStreams):
308  streamTime = [0]*numStreams
309  streamState = [0]*numStreams
310  stalledModules = {}
311  modulesActiveOnStream = [{} for x in xrange(numStreams)]
312  for n,trans,s,time,isEvent in processingSteps:
313 
314  waitTime = None
315  modulesOnStream = modulesActiveOnStream[s]
316  if trans == kPrefetchEnd:
317  modulesOnStream[n] = time
318  elif trans == kStarted or trans == kStartedAcquire:
319  if n in modulesOnStream:
320  waitTime = time - modulesOnStream[n]
321  modulesOnStream.pop(n, None)
322  streamState[s] +=1
323  elif trans == kFinished or trans == kFinishedAcquire:
324  streamState[s] -=1
325  streamTime[s] = time
326  elif trans == kStartedSourceDelayedRead:
327  if streamState[s] == 0:
328  waitTime = time - streamTime[s]
329  elif trans == kStartedSource:
330  modulesOnStream.clear()
331  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
332  streamTime[s] = time
333  if waitTime is not None:
334  if waitTime > kStallThreshold:
335  t = stalledModules.setdefault(n,[])
336  t.append(waitTime)
337  return stalledModules
338 
339 #----------------------------------------------
340 def createAsciiImage(processingSteps, numStreams, maxNameSize):
341  streamTime = [0]*numStreams
342  streamState = [0]*numStreams
343  modulesActiveOnStreams = [{} for x in xrange(numStreams)]
344  for n,trans,s,time,isEvent in processingSteps:
345  waitTime = None
346  modulesActiveOnStream = modulesActiveOnStreams[s]
347  if trans == kPrefetchEnd:
348  modulesActiveOnStream[n] = time
349  continue
350  elif trans == kStartedAcquire or trans == kStarted:
351  if n in modulesActiveOnStream:
352  waitTime = time - modulesActiveOnStream[n]
353  modulesActiveOnStream.pop(n, None)
354  streamState[s] +=1
355  elif trans == kFinishedAcquire or trans == kFinished:
356  streamState[s] -=1
357  streamTime[s] = time
358  elif trans == kStartedSourceDelayedRead:
359  if streamState[s] == 0:
360  waitTime = time - streamTime[s]
361  elif trans == kStartedSource:
362  modulesActiveOnStream.clear()
363  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
364  streamTime[s] = time
365  states = "%-*s: " % (maxNameSize,n)
366  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
367  states +="+ "
368  else:
369  states +="- "
370  for index, state in enumerate(streamState):
371  if n==kSourceFindEvent and index == s:
372  states +="* "
373  else:
374  states +=str(state)+" "
375  states += " -- " + str(time/1000.) + " " + str(s) + " "
376  if waitTime is not None:
377  states += " %.2f"% (waitTime/1000.)
378  if waitTime > kStallThreshold:
379  states += " STALLED"
380 
381  print states
382 
383 #----------------------------------------------
384 def printStalledModulesInOrder(stalledModules):
385  priorities = []
386  maxNameSize = 0
387  for name,t in stalledModules.iteritems():
388  maxNameSize = max(maxNameSize, len(name))
389  t.sort(reverse=True)
390  priorities.append((name,sum(t),t))
391 
392  def sumSort(i,j):
393  return cmp(i[1],j[1])
394  priorities.sort(cmp=sumSort, reverse=True)
395 
396  nameColumn = "Stalled Module"
397  maxNameSize = max(maxNameSize, len(nameColumn))
398 
399  stallColumn = "Tot Stall Time"
400  stallColumnLength = len(stallColumn)
401 
402  print "%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times"
403  for n,s,t in priorities:
404  paddedName = "%-*s:" % (maxNameSize,n)
405  print paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t])
406 
407 #--------------------------------------------------------
408 class Point:
409  def __init__(self, x_, y_):
410  self.x = x_
411  self.y = y_
412 
413  def __str__(self):
414  return "(x: {}, y: {})".format(self.x,self.y)
415 
416  def __repr__(self):
417  return self.__str__()
418 
419 #--------------------------------------------------------
421  if len(ps) < 2:
422  return ps
423  reducedPoints = []
424  tmp = Point(ps[0].x, ps[0].y)
425  for p in ps[1:]:
426  if tmp.x == p.x:
427  tmp.y += p.y
428  else:
429  reducedPoints.append(tmp)
430  tmp = Point(p.x, p.y)
431  reducedPoints.append(tmp)
432  reducedPoints = [p for p in reducedPoints if p.y != 0]
433  return reducedPoints
434 
435 # -------------------------------------------
436 def adjacentDiff(*pairLists):
437  points = []
438  for pairList in pairLists:
439  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
440  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
441  points.sort(key=attrgetter('x'))
442  return points
443 
444 stackType = 'stack'
445 
446 # --------------------------------------------
447 class Stack:
448  def __init__(self):
449  self.data = []
450 
451  def update(self, graphType, points):
452  tmp = points
453  if len(self.data) != 0:
454  tmp += self.data[-1][1]
455 
456  tmp.sort(key=attrgetter('x'))
457  tmp = reduceSortedPoints(tmp)
458  self.data.append((graphType, tmp))
459 
460 #---------------------------------------------
461 # StreamInfoElement
463  def __init__(self, begin_, delta_, color_):
464  self.begin=begin_
465  self.delta=delta_
466  self.color=color_
467 
468  def unpack(self):
469  return self.begin, self.delta, self.color
470 
471 #----------------------------------------------
472 # Consolidating contiguous blocks with the same color
473 # drastically reduces the size of the pdf file.
474 def consolidateContiguousBlocks(numStreams, streamInfo):
475  oldStreamInfo = streamInfo
476  streamInfo = [[] for x in xrange(numStreams)]
477 
478  for s in xrange(numStreams):
479  if oldStreamInfo[s]:
480  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
481  for info in oldStreamInfo[s][1:]:
482  start,length,color = info.unpack()
483  if color == lastColor and lastStartTime+lastTimeLength == start:
484  lastTimeLength += length
485  else:
486  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
487  lastStartTime = start
488  lastTimeLength = length
489  lastColor = color
490  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
491 
492  return streamInfo
493 
494 #----------------------------------------------
495 # Consolidating contiguous blocks with the same color drastically
496 # reduces the size of the pdf file. Same functionality as the
497 # previous function, but with slightly different implementation.
499  oldBlocks = blocks
500 
501  blocks = []
502  if not oldBlocks:
503  return blocks
504 
505  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
506  for start,length,height in oldBlocks[1:]:
507  if height == lastHeight and lastStartTime+lastTimeLength == start:
508  lastTimeLength += length
509  else:
510  blocks.append((lastStartTime,lastTimeLength,lastHeight))
511  lastStartTime = start
512  lastTimeLength = length
513  lastHeight = height
514  blocks.append((lastStartTime,lastTimeLength,lastHeight))
515 
516  return blocks
517 
518 #----------------------------------------------
519 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
520  points = sorted(points, key=attrgetter('x'))
521  points = reduceSortedPoints(points)
522  streamHeight = 0
523  preparedTimes = []
524  for t1,t2 in zip(points, points[1:]):
525  streamHeight += t1.y
526  # We make a cut here when plotting because the first row for
527  # each stream was already plotted previously and we do not
528  # need to plot it again. And also we want to count things
529  # properly in allStackTimes. We want to avoid double counting
530  # or missing running modules and this is complicated because
531  # we counted the modules in the first row already.
532  if streamHeight < streamHeightCut:
533  continue
534  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
535  preparedTimes.sort(key=itemgetter(2))
536  preparedTimes = mergeContiguousBlocks(preparedTimes)
537 
538  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
539  theTS = [(t[0],t[1]) for t in ts]
540  if doPlot:
541  theTimes = [(t[0]/1000.,t[1]/1000.) for t in theTS]
542  yspan = (stream-0.4+height,height*(nthreads-1))
543  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
544  if addToStackTimes:
545  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
546 
547 #----------------------------------------------
548 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
549 
550  stalledModuleNames = set([x for x in stalledModuleInfo.iterkeys()])
551  streamLowestRow = [[] for x in xrange(numStreams)]
552  modulesActiveOnStreams = [set() for x in xrange(numStreams)]
553  acquireActiveOnStreams = [set() for x in xrange(numStreams)]
554  externalWorkOnStreams = [set() for x in xrange(numStreams)]
555  previousFinishTime = [None for x in xrange(numStreams)]
556  streamRunningTimes = [[] for x in xrange(numStreams)]
557  streamExternalWorkRunningTimes = [[] for x in xrange(numStreams)]
558  maxNumberOfConcurrentModulesOnAStream = 1
559  previousTime = [0 for x in xrange(numStreams)]
560 
561  # The next five variables are only used to check for out of order transitions
562  finishBeforeStart = [set() for x in xrange(numStreams)]
563  finishAcquireBeforeStart = [set() for x in xrange(numStreams)]
564  countSource = [0 for x in xrange(numStreams)]
565  countDelayedSource = [0 for x in xrange(numStreams)]
566  countExternalWork = [defaultdict(int) for x in xrange(numStreams)]
567 
568  timeOffset = None
569  for n,trans,s,time,isEvent in processingSteps:
570  if timeOffset is None:
571  timeOffset = time
572  startTime = None
573  time -=timeOffset
574  # force the time to monotonically increase on each stream
575  if time < previousTime[s]:
576  time = previousTime[s]
577  previousTime[s] = time
578 
579  activeModules = modulesActiveOnStreams[s]
580  acquireModules = acquireActiveOnStreams[s]
581  externalWorkModules = externalWorkOnStreams[s]
582 
583  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
584  if checkOrder:
585  # Note that the code which checks the order of transitions assumes that
586  # all the transitions exist in the input. It is checking only for order
587  # problems, usually a start before a finish. Problems are fixed and
588  # silently ignored. Nothing gets plotted for transitions that are
589  # in the wrong order.
590  if trans == kStarted:
591  countExternalWork[s][n] -= 1
592  if n in finishBeforeStart[s]:
593  finishBeforeStart[s].remove(n)
594  continue
595  elif trans == kStartedAcquire:
596  if n in finishAcquireBeforeStart[s]:
597  finishAcquireBeforeStart[s].remove(n)
598  continue
599 
600  if trans == kStartedSourceDelayedRead:
601  countDelayedSource[s] += 1
602  if countDelayedSource[s] < 1:
603  continue
604  elif trans == kStartedSource:
605  countSource[s] += 1
606  if countSource[s] < 1:
607  continue
608 
609  moduleNames = activeModules.copy()
610  moduleNames.update(acquireModules)
611  if trans == kStartedAcquire:
612  acquireModules.add(n)
613  else:
614  activeModules.add(n)
615  streamRunningTimes[s].append(Point(time,1))
616  if moduleNames or externalWorkModules:
617  startTime = previousFinishTime[s]
618  previousFinishTime[s] = time
619 
620  if trans == kStarted and n in externalWorkModules:
621  externalWorkModules.remove(n)
622  streamExternalWorkRunningTimes[s].append(Point(time, -1))
623  else:
624  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
625  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
626  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
627  if checkOrder:
628  if trans == kFinished:
629  if n not in activeModules:
630  finishBeforeStart[s].add(n)
631  continue
632 
633  if trans == kFinishedSourceDelayedRead:
634  countDelayedSource[s] -= 1
635  if countDelayedSource[s] < 0:
636  continue
637  elif trans == kFinishedSource:
638  countSource[s] -= 1
639  if countSource[s] < 0:
640  continue
641 
642  if trans == kFinishedAcquire:
643  if checkOrder:
644  countExternalWork[s][n] += 1
645  if displayExternalWork:
646  if (not checkOrder) or countExternalWork[s][n] > 0:
647  externalWorkModules.add(n)
648  streamExternalWorkRunningTimes[s].append(Point(time,+1))
649  if checkOrder and n not in acquireModules:
650  finishAcquireBeforeStart[s].add(n)
651  continue
652  streamRunningTimes[s].append(Point(time,-1))
653  startTime = previousFinishTime[s]
654  previousFinishTime[s] = time
655  moduleNames = activeModules.copy()
656  moduleNames.update(acquireModules)
657 
658  if trans == kFinishedAcquire:
659  acquireModules.remove(n)
660  elif trans == kFinishedSourceDelayedRead:
661  if countDelayedSource[s] == 0:
662  activeModules.remove(n)
663  elif trans == kFinishedSource:
664  if countSource[s] == 0:
665  activeModules.remove(n)
666  else:
667  activeModules.remove(n)
668 
669  if startTime is not None:
670  c="green"
671  if not isEvent:
672  c="limegreen"
673  if not moduleNames:
674  c = "darkviolet"
675  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
676  c = "orange"
677  else:
678  for n in moduleNames:
679  if n in stalledModuleNames:
680  c="red"
681  break
682  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
683  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
684 
685  nr = 1
686  if shownStacks:
687  nr += 1
688  fig, ax = plt.subplots(nrows=nr, squeeze=True)
689  axStack = None
690  if shownStacks:
691  [xH,yH] = fig.get_size_inches()
692  fig.set_size_inches(xH,yH*4/3)
693  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
694  axStack = plt.subplot2grid((4,1),(3,0))
695 
696  ax.set_xlabel("Time (sec)")
697  ax.set_ylabel("Stream ID")
698  ax.set_ylim(-0.5,numStreams-0.5)
699  ax.yaxis.set_ticks(xrange(numStreams))
700 
701  height = 0.8/maxNumberOfConcurrentModulesOnAStream
702  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
703  for iStream,lowestRow in enumerate(streamLowestRow):
704  times=[(x.begin/1000., x.delta/1000.) for x in lowestRow] # Scale from msec to sec.
705  colors=[x.color for x in lowestRow]
706  # for each stream, plot the lowest row
707  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
708  # record them also for inclusion in the stack plot
709  # the darkviolet ones get counted later so do not count them here
710  for info in lowestRow:
711  if not info.color == 'darkviolet':
712  allStackTimes[info.color].append((info.begin, info.delta))
713 
714  # Now superimpose the number of concurrently running modules on to the graph.
715  if maxNumberOfConcurrentModulesOnAStream > 1:
716 
717  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
718 
719  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
720  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
721 
722  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
723  allStackTimes, ax, i, height,
724  streamHeightCut=2,
725  doPlot=True,
726  addToStackTimes=False,
727  color='darkviolet',
728  threadOffset=1)
729 
730  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
731  allStackTimes, ax, i, height,
732  streamHeightCut=2,
733  doPlot=True,
734  addToStackTimes=True,
735  color='blue',
736  threadOffset=1)
737 
738  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
739  allStackTimes, ax, i, height,
740  streamHeightCut=1,
741  doPlot=False,
742  addToStackTimes=True,
743  color='darkviolet',
744  threadOffset=0)
745 
746  if shownStacks:
747  print "> ... Generating stack"
748  stack = Stack()
749  for color in ['green','limegreen','blue','red','orange','darkviolet']:
750  tmp = allStackTimes[color]
751  tmp = reduceSortedPoints(adjacentDiff(tmp))
752  stack.update(color, tmp)
753 
754  for stk in reversed(stack.data):
755  color = stk[0]
756 
757  # Now arrange list in a manner that it can be grouped by the height of the block
758  height = 0
759  xs = []
760  for p1,p2 in zip(stk[1], stk[1][1:]):
761  height += p1.y
762  xs.append((p1.x, p2.x-p1.x, height))
763  xs.sort(key = itemgetter(2))
764  xs = mergeContiguousBlocks(xs)
765 
766  for height, xpairs in groupby(xs, itemgetter(2)):
767  finalxs = [(e[0]/1000.,e[1]/1000.) for e in xpairs]
768  # plot the stacked plot, one color and one height on each call to broken_barh
769  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
770 
771  axStack.set_xlabel("Time (sec)");
772  axStack.set_ylabel("# threads");
773  axStack.set_xlim(ax.get_xlim())
774  axStack.tick_params(top='off')
775 
776  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
777  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
778  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
779  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
780  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
781  if displayExternalWork:
782  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
783  print "> ... Saving to file: '{}'".format(pdfFile)
784  plt.savefig(pdfFile)
785 
786 #=======================================
787 if __name__=="__main__":
788  import argparse
789  import re
790  import sys
791 
792  # Program options
793  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
794  formatter_class=argparse.RawDescriptionHelpFormatter,
795  epilog=printHelp())
796  parser.add_argument('filename',
797  type=argparse.FileType('r'), # open file
798  help='file to process')
799  parser.add_argument('-g', '--graph',
800  nargs='?',
801  metavar="'stall.pdf'",
802  const='stall.pdf',
803  dest='graph',
804  help='''Create pdf file of stream stall graph. If -g is specified
805  by itself, the default file name is \'stall.pdf\'. Otherwise, the
806  argument to the -g option is the filename.''')
807  parser.add_argument('-s', '--stack',
808  action='store_true',
809  help='''Create stack plot, combining all stream-specific info.
810  Can be used only when -g is specified.''')
811  parser.add_argument('-e', '--external',
812  action='store_false',
813  help='''Suppress display of external work in graphs.''')
814  parser.add_argument('-o', '--order',
815  action='store_true',
816  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
817  args = parser.parse_args()
818 
819  # Process parsed options
820  inputFile = args.filename
821  pdfFile = args.graph
822  shownStacks = args.stack
823  displayExternalWork = args.external
824  checkOrder = args.order
825 
826  doGraphic = False
827  if pdfFile is not None:
828  doGraphic = True
829  import matplotlib
830  # Need to force display since problems with CMSSW matplotlib.
831  matplotlib.use("PDF")
832  import matplotlib.pyplot as plt
833  if not re.match(r'^[\w\.]+$', pdfFile):
834  print "Malformed file name '{}' supplied with the '-g' option.".format(pdfFile)
835  print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed."
836  exit(1)
837 
838  if '.' in pdfFile:
839  extension = pdfFile.split('.')[-1]
840  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
841  if not extension in supported_filetypes:
842  print "A graph cannot be saved to a filename with extension '{}'.".format(extension)
843  print "The allowed extensions are:"
844  for filetype in supported_filetypes:
845  print " '.{}'".format(filetype)
846  exit(1)
847 
848  if pdfFile is None and shownStacks:
849  print "The -s (--stack) option can be used only when the -g (--graph) option is specified."
850  exit(1)
851 
852  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
853  reader = readLogFile(inputFile)
854  if kTracerInput:
855  checkOrder = True
856  sys.stderr.write(">processing data\n")
857  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
858 
859  if not doGraphic:
860  sys.stderr.write(">preparing ASCII art\n")
861  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
862  else:
863  sys.stderr.write(">creating PDF\n")
864  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
865  printStalledModulesInOrder(stalledModules)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def processingStepsFromStallMonitorOutput(f, moduleNames)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
double split
Definition: MVATrainer.cc:139
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run