CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from builtins import range
4 from itertools import groupby
5 from operator import attrgetter,itemgetter
6 import sys
7 from collections import defaultdict
8 import six
9 #----------------------------------------------
10 def printHelp():
11  s = '''
12 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
13  stream stalls. Use something like this in the configuration:
14 
15  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
16 
17  After running the job, execute this script and pass the name of the
18  StallMonitor log file to the script.
19 
20  By default, the script will then print an 'ASCII art' stall graph
21  which consists of a line of text for each time a module or the
22  source stops or starts. Each line contains the name of the module
23  which either started or stopped running, and the number of modules
24  running on each stream at that moment in time. After that will be
25  the time and stream number. Then if a module just started, you
26  will also see the amount of time the module spent between finishing
27  its prefetching and starting. The state of a module is represented
28  by a symbol:
29 
30  plus ("+") the stream has just finished waiting and is starting a module
31  minus ("-") the stream just finished running a module
32 
33  If a module had to wait more than 0.1 seconds, the end of the line
34  will have "STALLED". Startup actions, e.g. reading conditions,
35  may affect results for the first few events.
36 
37  Using the command line arguments described above you can make the
38  program create a PDF file with actual graphs instead of the 'ASCII art'
39  output.
40 
41  Once the graph is completed, the program outputs the list of modules
42  which had the greatest total stall times. The list is sorted by
43  total stall time and written in descending order. In addition, the
44  list of all stall times for the module is given.
45 
46  There is an inferior alternative (an old obsolete way).
47  Instead of using the StallMonitor Service, you can use the
48  Tracer Service. Make sure to use the 'printTimestamps' option
49  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
50  There are problems associated with this and it is not recommended.'''
51  return s
52 
53 kStallThreshold=100000 #in microseconds
54 kTracerInput=False
55 
56 #Stream states
57 kStarted=0
58 kFinished=1
59 kPrefetchEnd=2
60 kStartedAcquire=3
61 kFinishedAcquire=4
62 kStartedSource=5
63 kFinishedSource=6
64 kStartedSourceDelayedRead=7
65 kFinishedSourceDelayedRead=8
66 
67 #Special names
68 kSourceFindEvent = "sourceFindEvent"
69 kSourceDelayedRead ="sourceDelayedRead"
70 
71 #----------------------------------------------
73  for rawl in f:
74  l = rawl.strip()
75  if not l or l[0] == '#':
76  continue
77  (step,payload) = tuple(l.split(None,1))
78  payload=payload.split()
79 
80  # Ignore these
81  if step == 'E' or step == 'e':
82  continue
83 
84  # Payload format is:
85  # <stream id> <..other fields..> <time since begin job>
86  stream = int(payload[0])
87  time = int(payload[-1])
88  trans = None
89  isEvent = True
90 
91  name = None
92  # 'S' = begin of event creation in source
93  # 's' = end of event creation in source
94  if step == 'S' or step == 's':
95  name = kSourceFindEvent
96  trans = kStartedSource
97  # The start of an event is the end of the framework part
98  if step == 's':
99  trans = kFinishedSource
100  else:
101  # moduleID is the second payload argument for all steps below
102  moduleID = payload[1]
103 
104  # 'p' = end of module prefetching
105  # 'M' = begin of module processing
106  # 'm' = end of module processing
107  if step == 'p' or step == 'M' or step == 'm':
108  trans = kStarted
109  if step == 'p':
110  trans = kPrefetchEnd
111  elif step == 'm':
112  trans = kFinished
113  if step == 'm' or step == 'M':
114  isEvent = (int(payload[2]) == 0)
115  name = moduleNames[moduleID]
116 
117  # 'A' = begin of module acquire function
118  # 'a' = end of module acquire function
119  elif step == 'A' or step == 'a':
120  trans = kStartedAcquire
121  if step == 'a':
122  trans = kFinishedAcquire
123  name = moduleNames[moduleID]
124 
125  # Delayed read from source
126  # 'R' = begin of delayed read from source
127  # 'r' = end of delayed read from source
128  elif step == 'R' or step == 'r':
129  trans = kStartedSourceDelayedRead
130  if step == 'r':
131  trans = kFinishedSourceDelayedRead
132  name = kSourceDelayedRead
133 
134  if trans is not None:
135  yield (name,trans,stream,time, isEvent)
136 
137  return
138 
140  def __init__(self,f):
141  numStreams = 0
142  numStreamsFromSource = 0
143  moduleNames = {}
144  for rawl in f:
145  l = rawl.strip()
146  if l and l[0] == 'M':
147  i = l.split(' ')
148  if i[3] == '4':
149  #found global begin run
150  numStreams = int(i[1])+1
151  break
152  if numStreams == 0 and l and l[0] == 'S':
153  s = int(l.split(' ')[1])
154  if s > numStreamsFromSource:
155  numStreamsFromSource = s
156  if len(l) > 5 and l[0:2] == "#M":
157  (id,name)=tuple(l[2:].split())
158  moduleNames[id] = name
159  continue
160  self._f = f
161  if numStreams == 0:
162  numStreams = numStreamsFromSource +1
163  self.numStreams =numStreams
164  self._moduleNames = moduleNames
165  self.maxNameSize =0
166  for n in six.iteritems(moduleNames):
167  self.maxNameSize = max(self.maxNameSize,len(n))
168  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
169 
170  def processingSteps(self):
171  """Create a generator which can step through the file and return each processing step.
172  Using a generator reduces the memory overhead when parsing a large file.
173  """
174  self._f.seek(0)
176 
177 #----------------------------------------------
178 # Utility to get time out of Tracer output text format
179 def getTime(line):
180  time = line.split(" ")[1]
181  time = time.split(":")
182  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
183  time = int(1000000*time) # convert to microseconds
184  return time
185 
186 #----------------------------------------------
187 # The next function parses the Tracer output.
188 # Here are some differences to consider if you use Tracer output
189 # instead of the StallMonitor output.
190 # - The time in the text of the Tracer output is not as precise
191 # as the StallMonitor (.01 s vs .001 s)
192 # - The MessageLogger bases the time on when the message printed
193 # and not when it was initially queued up to print which smears
194 # the accuracy of the times.
195 # - Both of the previous things can produce some strange effects
196 # in the output plots.
197 # - The file size of the Tracer text file is much larger.
198 # - The CPU work needed to parse the Tracer files is larger.
199 # - The Tracer log file is expected to have "++" in the first
200 # or fifth line. If there are extraneous lines at the beginning
201 # you have to remove them.
202 # - The ascii printout out will have one extraneous line
203 # near the end for the SourceFindEvent start.
204 # - The only advantage I can see is that you have only
205 # one output file to handle instead of two, the regular
206 # log file and the StallMonitor output.
207 # We might should just delete the Tracer option because it is
208 # clearly inferior ...
210  processingSteps = []
211  numStreams = 0
212  maxNameSize = 0
213  startTime = 0
214  streamsThatSawFirstEvent = set()
215  for l in f:
216  trans = None
217  # We estimate the start and stop of the source
218  # by the end of the previous event and start of
219  # the event. This is historical, probably because
220  # the Tracer output for the begin and end of the
221  # source event does not include the stream number.
222  if l.find("processing event :") != -1:
223  name = kSourceFindEvent
224  trans = kStartedSource
225  # the end of the source is estimated using the start of the event
226  if l.find("starting:") != -1:
227  trans = kFinishedSource
228  elif l.find("processing event for module") != -1:
229  trans = kStarted
230  if l.find("finished:") != -1:
231  if l.find("prefetching") != -1:
232  trans = kPrefetchEnd
233  else:
234  trans = kFinished
235  else:
236  if l.find("prefetching") != -1:
237  #skip this since we don't care about prefetch starts
238  continue
239  name = l.split("'")[1]
240  elif l.find("processing event acquire for module:") != -1:
241  trans = kStartedAcquire
242  if l.find("finished:") != -1:
243  trans = kFinishedAcquire
244  name = l.split("'")[1]
245  elif l.find("event delayed read from source") != -1:
246  trans = kStartedSourceDelayedRead
247  if l.find("finished:") != -1:
248  trans = kFinishedSourceDelayedRead
249  name = kSourceDelayedRead
250  if trans is not None:
251  time = getTime(l)
252  if startTime == 0:
253  startTime = time
254  time = time - startTime
255  streamIndex = l.find("stream = ")
256  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
257  maxNameSize = max(maxNameSize, len(name))
258 
259  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
260  # This is wrong but there is no way to estimate the time better
261  # because there is no previous event for the first event.
262  processingSteps.append((name,kStartedSource,stream,time,True))
263  streamsThatSawFirstEvent.add(stream)
264 
265  processingSteps.append((name,trans,stream,time, True))
266  numStreams = max(numStreams, stream+1)
267 
268  f.close()
269  return (processingSteps,numStreams,maxNameSize)
270 
272  def __init__(self,f):
273  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
274  def processingSteps(self):
275  return self._processingSteps
276 
277 #----------------------------------------------
278 def chooseParser(inputFile):
279 
280  firstLine = inputFile.readline().rstrip()
281  for i in range(3):
282  inputFile.readline()
283  # Often the Tracer log file starts with 4 lines not from the Tracer
284  fifthLine = inputFile.readline().rstrip()
285  inputFile.seek(0) # Rewind back to beginning
286  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
287  print("> ... Parsing StallMonitor output.")
288  return StallMonitorParser
289 
290  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
291  global kTracerInput
292  kTracerInput = True
293  print("> ... Parsing Tracer output.")
294  return TracerParser
295  else:
296  inputFile.close()
297  print("Unknown input format.")
298  exit(1)
299 
300 #----------------------------------------------
301 def readLogFile(inputFile):
302  parseInput = chooseParser(inputFile)
303  return parseInput(inputFile)
304 
305 #----------------------------------------------
306 #
307 # modules: The time between prefetch finished and 'start processing' is
308 # the time it took to acquire any resources which is by definition the
309 # stall time.
310 #
311 # source: The source just records how long it spent doing work,
312 # not how long it was stalled. We can get a lower bound on the stall
313 # time for delayed reads by measuring the time the stream was doing
314 # no work up till the start of the source delayed read.
315 #
316 def findStalledModules(processingSteps, numStreams):
317  streamTime = [0]*numStreams
318  streamState = [0]*numStreams
319  stalledModules = {}
320  modulesActiveOnStream = [{} for x in range(numStreams)]
321  for n,trans,s,time,isEvent in processingSteps:
322 
323  waitTime = None
324  modulesOnStream = modulesActiveOnStream[s]
325  if trans == kPrefetchEnd:
326  modulesOnStream[n] = time
327  elif trans == kStarted or trans == kStartedAcquire:
328  if n in modulesOnStream:
329  waitTime = time - modulesOnStream[n]
330  modulesOnStream.pop(n, None)
331  streamState[s] +=1
332  elif trans == kFinished or trans == kFinishedAcquire:
333  streamState[s] -=1
334  streamTime[s] = time
335  elif trans == kStartedSourceDelayedRead:
336  if streamState[s] == 0:
337  waitTime = time - streamTime[s]
338  elif trans == kStartedSource:
339  modulesOnStream.clear()
340  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
341  streamTime[s] = time
342  if waitTime is not None:
343  if waitTime > kStallThreshold:
344  t = stalledModules.setdefault(n,[])
345  t.append(waitTime)
346  return stalledModules
347 
348 
349 def createModuleTiming(processingSteps, numStreams):
350  import json
351  streamTime = [0]*numStreams
352  streamState = [0]*numStreams
353  moduleTimings = defaultdict(list)
354  modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
355  for n,trans,s,time,isEvent in processingSteps:
356  waitTime = None
357  modulesOnStream = modulesActiveOnStream[s]
358  if isEvent:
359  if trans == kStarted:
360  streamState[s] = 1
361  modulesOnStream[n]=time
362  elif trans == kFinished:
363  waitTime = time - modulesOnStream[n]
364  modulesOnStream.pop(n, None)
365  streamState[s] = 0
366  moduleTimings[n].append(float(waitTime/1000.))
367 
368  with open('module-timings.json', 'w') as outfile:
369  outfile.write(json.dumps(moduleTimings, indent=4))
370 
371 #----------------------------------------------
372 def createAsciiImage(processingSteps, numStreams, maxNameSize):
373  streamTime = [0]*numStreams
374  streamState = [0]*numStreams
375  modulesActiveOnStreams = [{} for x in range(numStreams)]
376  for n,trans,s,time,isEvent in processingSteps:
377  waitTime = None
378  modulesActiveOnStream = modulesActiveOnStreams[s]
379  if trans == kPrefetchEnd:
380  modulesActiveOnStream[n] = time
381  continue
382  elif trans == kStartedAcquire or trans == kStarted:
383  if n in modulesActiveOnStream:
384  waitTime = time - modulesActiveOnStream[n]
385  modulesActiveOnStream.pop(n, None)
386  streamState[s] +=1
387  elif trans == kFinishedAcquire or trans == kFinished:
388  streamState[s] -=1
389  streamTime[s] = time
390  elif trans == kStartedSourceDelayedRead:
391  if streamState[s] == 0:
392  waitTime = time - streamTime[s]
393  elif trans == kStartedSource:
394  modulesActiveOnStream.clear()
395  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
396  streamTime[s] = time
397  states = "%-*s: " % (maxNameSize,n)
398  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
399  states +="+ "
400  else:
401  states +="- "
402  for index, state in enumerate(streamState):
403  if n==kSourceFindEvent and index == s:
404  states +="* "
405  else:
406  states +=str(state)+" "
407  states += " -- " + str(time/1000.) + " " + str(s) + " "
408  if waitTime is not None:
409  states += " %.2f"% (waitTime/1000.)
410  if waitTime > kStallThreshold:
411  states += " STALLED"
412 
413  print(states)
414 
415 #----------------------------------------------
416 def printStalledModulesInOrder(stalledModules):
417  priorities = []
418  maxNameSize = 0
419  for name,t in six.iteritems(stalledModules):
420  maxNameSize = max(maxNameSize, len(name))
421  t.sort(reverse=True)
422  priorities.append((name,sum(t),t))
423 
424  priorities.sort(key=lambda a: a[1], reverse=True)
425 
426  nameColumn = "Stalled Module"
427  maxNameSize = max(maxNameSize, len(nameColumn))
428 
429  stallColumn = "Tot Stall Time"
430  stallColumnLength = len(stallColumn)
431 
432  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
433  for n,s,t in priorities:
434  paddedName = "%-*s:" % (maxNameSize,n)
435  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
436 
437 #--------------------------------------------------------
438 class Point:
439  def __init__(self, x_, y_):
440  self.x = x_
441  self.y = y_
442 
443  def __str__(self):
444  return "(x: {}, y: {})".format(self.x,self.y)
445 
446  def __repr__(self):
447  return self.__str__()
448 
449 #--------------------------------------------------------
451  if len(ps) < 2:
452  return ps
453  reducedPoints = []
454  tmp = Point(ps[0].x, ps[0].y)
455  for p in ps[1:]:
456  if tmp.x == p.x:
457  tmp.y += p.y
458  else:
459  reducedPoints.append(tmp)
460  tmp = Point(p.x, p.y)
461  reducedPoints.append(tmp)
462  reducedPoints = [p for p in reducedPoints if p.y != 0]
463  return reducedPoints
464 
465 # -------------------------------------------
466 def adjacentDiff(*pairLists):
467  points = []
468  for pairList in pairLists:
469  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
470  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
471  points.sort(key=attrgetter('x'))
472  return points
473 
474 stackType = 'stack'
475 
476 # --------------------------------------------
477 class Stack:
478  def __init__(self):
479  self.data = []
480 
481  def update(self, graphType, points):
482  tmp = points
483  if len(self.data) != 0:
484  tmp += self.data[-1][1]
485 
486  tmp.sort(key=attrgetter('x'))
487  tmp = reduceSortedPoints(tmp)
488  self.data.append((graphType, tmp))
489 
490 #---------------------------------------------
491 # StreamInfoElement
493  def __init__(self, begin_, delta_, color_):
494  self.begin=begin_
495  self.delta=delta_
496  self.color=color_
497 
498  def unpack(self):
499  return self.begin, self.delta, self.color
500 
501 #----------------------------------------------
502 # Consolidating contiguous blocks with the same color
503 # drastically reduces the size of the pdf file.
504 def consolidateContiguousBlocks(numStreams, streamInfo):
505  oldStreamInfo = streamInfo
506  streamInfo = [[] for x in range(numStreams)]
507 
508  for s in range(numStreams):
509  if oldStreamInfo[s]:
510  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
511  for info in oldStreamInfo[s][1:]:
512  start,length,color = info.unpack()
513  if color == lastColor and lastStartTime+lastTimeLength == start:
514  lastTimeLength += length
515  else:
516  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
517  lastStartTime = start
518  lastTimeLength = length
519  lastColor = color
520  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
521 
522  return streamInfo
523 
524 #----------------------------------------------
525 # Consolidating contiguous blocks with the same color drastically
526 # reduces the size of the pdf file. Same functionality as the
527 # previous function, but with slightly different implementation.
529  oldBlocks = blocks
530 
531  blocks = []
532  if not oldBlocks:
533  return blocks
534 
535  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
536  for start,length,height in oldBlocks[1:]:
537  if height == lastHeight and lastStartTime+lastTimeLength == start:
538  lastTimeLength += length
539  else:
540  blocks.append((lastStartTime,lastTimeLength,lastHeight))
541  lastStartTime = start
542  lastTimeLength = length
543  lastHeight = height
544  blocks.append((lastStartTime,lastTimeLength,lastHeight))
545 
546  return blocks
547 
548 #----------------------------------------------
549 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
550  points = sorted(points, key=attrgetter('x'))
551  points = reduceSortedPoints(points)
552  streamHeight = 0
553  preparedTimes = []
554  for t1,t2 in zip(points, points[1:]):
555  streamHeight += t1.y
556  # We make a cut here when plotting because the first row for
557  # each stream was already plotted previously and we do not
558  # need to plot it again. And also we want to count things
559  # properly in allStackTimes. We want to avoid double counting
560  # or missing running modules and this is complicated because
561  # we counted the modules in the first row already.
562  if streamHeight < streamHeightCut:
563  continue
564  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
565  preparedTimes.sort(key=itemgetter(2))
566  preparedTimes = mergeContiguousBlocks(preparedTimes)
567 
568  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
569  theTS = [(t[0],t[1]) for t in ts]
570  if doPlot:
571  theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
572  yspan = (stream-0.4+height,height*(nthreads-1))
573  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
574  if addToStackTimes:
575  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
576 
577 #----------------------------------------------
578 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
579 
580  stalledModuleNames = set([x for x in stalledModuleInfo.iterkeys()])
581  streamLowestRow = [[] for x in range(numStreams)]
582  modulesActiveOnStreams = [set() for x in range(numStreams)]
583  acquireActiveOnStreams = [set() for x in range(numStreams)]
584  externalWorkOnStreams = [set() for x in range(numStreams)]
585  previousFinishTime = [None for x in range(numStreams)]
586  streamRunningTimes = [[] for x in range(numStreams)]
587  streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
588  maxNumberOfConcurrentModulesOnAStream = 1
589  externalWorkModulesInJob = False
590  previousTime = [0 for x in range(numStreams)]
591 
592  # The next five variables are only used to check for out of order transitions
593  finishBeforeStart = [set() for x in range(numStreams)]
594  finishAcquireBeforeStart = [set() for x in range(numStreams)]
595  countSource = [0 for x in range(numStreams)]
596  countDelayedSource = [0 for x in range(numStreams)]
597  countExternalWork = [defaultdict(int) for x in range(numStreams)]
598 
599  timeOffset = None
600  for n,trans,s,time,isEvent in processingSteps:
601  if timeOffset is None:
602  timeOffset = time
603  startTime = None
604  time -=timeOffset
605  # force the time to monotonically increase on each stream
606  if time < previousTime[s]:
607  time = previousTime[s]
608  previousTime[s] = time
609 
610  activeModules = modulesActiveOnStreams[s]
611  acquireModules = acquireActiveOnStreams[s]
612  externalWorkModules = externalWorkOnStreams[s]
613 
614  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
615  if checkOrder:
616  # Note that the code which checks the order of transitions assumes that
617  # all the transitions exist in the input. It is checking only for order
618  # problems, usually a start before a finish. Problems are fixed and
619  # silently ignored. Nothing gets plotted for transitions that are
620  # in the wrong order.
621  if trans == kStarted:
622  countExternalWork[s][n] -= 1
623  if n in finishBeforeStart[s]:
624  finishBeforeStart[s].remove(n)
625  continue
626  elif trans == kStartedAcquire:
627  if n in finishAcquireBeforeStart[s]:
628  finishAcquireBeforeStart[s].remove(n)
629  continue
630 
631  if trans == kStartedSourceDelayedRead:
632  countDelayedSource[s] += 1
633  if countDelayedSource[s] < 1:
634  continue
635  elif trans == kStartedSource:
636  countSource[s] += 1
637  if countSource[s] < 1:
638  continue
639 
640  moduleNames = activeModules.copy()
641  moduleNames.update(acquireModules)
642  if trans == kStartedAcquire:
643  acquireModules.add(n)
644  else:
645  activeModules.add(n)
646  streamRunningTimes[s].append(Point(time,1))
647  if moduleNames or externalWorkModules:
648  startTime = previousFinishTime[s]
649  previousFinishTime[s] = time
650 
651  if trans == kStarted and n in externalWorkModules:
652  externalWorkModules.remove(n)
653  streamExternalWorkRunningTimes[s].append(Point(time, -1))
654  else:
655  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
656  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
657  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
658  if checkOrder:
659  if trans == kFinished:
660  if n not in activeModules:
661  finishBeforeStart[s].add(n)
662  continue
663 
664  if trans == kFinishedSourceDelayedRead:
665  countDelayedSource[s] -= 1
666  if countDelayedSource[s] < 0:
667  continue
668  elif trans == kFinishedSource:
669  countSource[s] -= 1
670  if countSource[s] < 0:
671  continue
672 
673  if trans == kFinishedAcquire:
674  if checkOrder:
675  countExternalWork[s][n] += 1
676  if displayExternalWork:
677  externalWorkModulesInJob = True
678  if (not checkOrder) or countExternalWork[s][n] > 0:
679  externalWorkModules.add(n)
680  streamExternalWorkRunningTimes[s].append(Point(time,+1))
681  if checkOrder and n not in acquireModules:
682  finishAcquireBeforeStart[s].add(n)
683  continue
684  streamRunningTimes[s].append(Point(time,-1))
685  startTime = previousFinishTime[s]
686  previousFinishTime[s] = time
687  moduleNames = activeModules.copy()
688  moduleNames.update(acquireModules)
689 
690  if trans == kFinishedAcquire:
691  acquireModules.remove(n)
692  elif trans == kFinishedSourceDelayedRead:
693  if countDelayedSource[s] == 0:
694  activeModules.remove(n)
695  elif trans == kFinishedSource:
696  if countSource[s] == 0:
697  activeModules.remove(n)
698  else:
699  activeModules.remove(n)
700 
701  if startTime is not None:
702  c="green"
703  if not isEvent:
704  c="limegreen"
705  if not moduleNames:
706  c = "darkviolet"
707  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
708  c = "orange"
709  else:
710  for n in moduleNames:
711  if n in stalledModuleNames:
712  c="red"
713  break
714  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
715  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
716 
717  nr = 1
718  if shownStacks:
719  nr += 1
720  fig, ax = plt.subplots(nrows=nr, squeeze=True)
721  axStack = None
722  if shownStacks:
723  [xH,yH] = fig.get_size_inches()
724  fig.set_size_inches(xH,yH*4/3)
725  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
726  axStack = plt.subplot2grid((4,1),(3,0))
727 
728  ax.set_xlabel("Time (sec)")
729  ax.set_ylabel("Stream ID")
730  ax.set_ylim(-0.5,numStreams-0.5)
731  ax.yaxis.set_ticks(range(numStreams))
732  if (setXAxis):
733  ax.set_xlim((xLower, xUpper))
734 
735  height = 0.8/maxNumberOfConcurrentModulesOnAStream
736  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
737  for iStream,lowestRow in enumerate(streamLowestRow):
738  times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
739  colors=[x.color for x in lowestRow]
740  # for each stream, plot the lowest row
741  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
742  # record them also for inclusion in the stack plot
743  # the darkviolet ones get counted later so do not count them here
744  for info in lowestRow:
745  if not info.color == 'darkviolet':
746  allStackTimes[info.color].append((info.begin, info.delta))
747 
748  # Now superimpose the number of concurrently running modules on to the graph.
749  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
750 
751  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
752 
753  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
754  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
755 
756  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
757  allStackTimes, ax, i, height,
758  streamHeightCut=2,
759  doPlot=True,
760  addToStackTimes=False,
761  color='darkviolet',
762  threadOffset=1)
763 
764  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
765  allStackTimes, ax, i, height,
766  streamHeightCut=2,
767  doPlot=True,
768  addToStackTimes=True,
769  color='blue',
770  threadOffset=1)
771 
772  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
773  allStackTimes, ax, i, height,
774  streamHeightCut=1,
775  doPlot=False,
776  addToStackTimes=True,
777  color='darkviolet',
778  threadOffset=0)
779 
780  if shownStacks:
781  print("> ... Generating stack")
782  stack = Stack()
783  for color in ['green','limegreen','blue','red','orange','darkviolet']:
784  tmp = allStackTimes[color]
785  tmp = reduceSortedPoints(adjacentDiff(tmp))
786  stack.update(color, tmp)
787 
788  for stk in reversed(stack.data):
789  color = stk[0]
790 
791  # Now arrange list in a manner that it can be grouped by the height of the block
792  height = 0
793  xs = []
794  for p1,p2 in zip(stk[1], stk[1][1:]):
795  height += p1.y
796  xs.append((p1.x, p2.x-p1.x, height))
797  xs.sort(key = itemgetter(2))
798  xs = mergeContiguousBlocks(xs)
799 
800  for height, xpairs in groupby(xs, itemgetter(2)):
801  finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
802  # plot the stacked plot, one color and one height on each call to broken_barh
803  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
804 
805  axStack.set_xlabel("Time (sec)");
806  axStack.set_ylabel("# modules");
807  axStack.set_xlim(ax.get_xlim())
808  axStack.tick_params(top='off')
809 
810  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
811  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
812  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
813  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
814  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
815  if displayExternalWork:
816  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
817  print("> ... Saving to file: '{}'".format(pdfFile))
818  plt.savefig(pdfFile)
819 
820 #=======================================
821 if __name__=="__main__":
822  import argparse
823  import re
824  import sys
825 
826  # Program options
827  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
828  formatter_class=argparse.RawDescriptionHelpFormatter,
829  epilog=printHelp())
830  parser.add_argument('filename',
831  type=argparse.FileType('r'), # open file
832  help='file to process')
833  parser.add_argument('-g', '--graph',
834  nargs='?',
835  metavar="'stall.pdf'",
836  const='stall.pdf',
837  dest='graph',
838  help='''Create pdf file of stream stall graph. If -g is specified
839  by itself, the default file name is \'stall.pdf\'. Otherwise, the
840  argument to the -g option is the filename.''')
841  parser.add_argument('-s', '--stack',
842  action='store_true',
843  help='''Create stack plot, combining all stream-specific info.
844  Can be used only when -g is specified.''')
845  parser.add_argument('-e', '--external',
846  action='store_false',
847  help='''Suppress display of external work in graphs.''')
848  parser.add_argument('-o', '--order',
849  action='store_true',
850  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
851  parser.add_argument('-t', '--timings',
852  action='store_true',
853  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
854  parser.add_argument('-l', '--lowerxaxis',
855  type=float,
856  default=0.0,
857  help='''Lower limit of x axis, default 0, not used if upper limit not set''')
858  parser.add_argument('-u', '--upperxaxis',
859  type=float,
860  help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
861  args = parser.parse_args()
862 
863  # Process parsed options
864  inputFile = args.filename
865  pdfFile = args.graph
866  shownStacks = args.stack
867  displayExternalWork = args.external
868  checkOrder = args.order
869  doModuleTimings = False
870  if args.timings:
871  doModuleTimings = True
872 
873  setXAxis = False
874  xUpper = 0.0
875  if args.upperxaxis is not None:
876  setXAxis = True
877  xUpper = args.upperxaxis
878  xLower = args.lowerxaxis
879 
880  doGraphic = False
881  if pdfFile is not None:
882  doGraphic = True
883  import matplotlib
884  # Need to force display since problems with CMSSW matplotlib.
885  matplotlib.use("PDF")
886  import matplotlib.pyplot as plt
887  if not re.match(r'^[\w\.]+$', pdfFile):
888  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
889  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
890  exit(1)
891 
892  if '.' in pdfFile:
893  extension = pdfFile.split('.')[-1]
894  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
895  if not extension in supported_filetypes:
896  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
897  print("The allowed extensions are:")
898  for filetype in supported_filetypes:
899  print(" '.{}'".format(filetype))
900  exit(1)
901 
902  if pdfFile is None and shownStacks:
903  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
904  exit(1)
905 
906  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
907  reader = readLogFile(inputFile)
908  if kTracerInput:
909  checkOrder = True
910  sys.stderr.write(">processing data\n")
911  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
912 
913 
914  if not doGraphic:
915  sys.stderr.write(">preparing ASCII art\n")
916  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
917  else:
918  sys.stderr.write(">creating PDF\n")
919  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
920  printStalledModulesInOrder(stalledModules)
921  if doModuleTimings:
922  sys.stderr.write(">creating module-timings.json\n")
923  createModuleTiming(reader.processingSteps(), reader.numStreams)
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
edmStreamStallGrapher.StreamInfoElement.color
color
Definition: edmStreamStallGrapher.py:496
resolutioncreator_cfi.object
object
Definition: resolutioncreator_cfi.py:4
dqmMemoryStats.float
float
Definition: dqmMemoryStats.py:127
edmStreamStallGrapher.StallMonitorParser.numStreams
numStreams
Definition: edmStreamStallGrapher.py:163
edmStreamStallGrapher.TracerParser.maxNameSize
maxNameSize
Definition: edmStreamStallGrapher.py:273
edmStreamStallGrapher.StallMonitorParser.processingSteps
def processingSteps(self)
Definition: edmStreamStallGrapher.py:170
edmStreamStallGrapher.StallMonitorParser._f
_f
Definition: edmStreamStallGrapher.py:160
edmStreamStallGrapher.Point.x
x
Definition: edmStreamStallGrapher.py:440
edmStreamStallGrapher.StallMonitorParser.maxNameSize
maxNameSize
Definition: edmStreamStallGrapher.py:165
edmStreamStallGrapher.Point.__repr__
def __repr__(self)
Definition: edmStreamStallGrapher.py:446
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
edmStreamStallGrapher.getTime
def getTime(line)
Definition: edmStreamStallGrapher.py:179
edmStreamStallGrapher.createPDFImage
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
Definition: edmStreamStallGrapher.py:578
edmStreamStallGrapher.printStalledModulesInOrder
def printStalledModulesInOrder(stalledModules)
Definition: edmStreamStallGrapher.py:416
edmStreamStallGrapher.StallMonitorParser.__init__
def __init__(self, f)
Definition: edmStreamStallGrapher.py:140
edmStreamStallGrapher.StreamInfoElement.unpack
def unpack(self)
Definition: edmStreamStallGrapher.py:498
edmStreamStallGrapher.Point.__str__
def __str__(self)
Definition: edmStreamStallGrapher.py:443
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
edmStreamStallGrapher.StreamInfoElement.begin
begin
Definition: edmStreamStallGrapher.py:494
edmStreamStallGrapher.Point
Definition: edmStreamStallGrapher.py:438
edmStreamStallGrapher.StallMonitorParser
Definition: edmStreamStallGrapher.py:139
str
#define str(s)
Definition: TestProcessor.cc:51
edmStreamStallGrapher.createModuleTiming
def createModuleTiming(processingSteps, numStreams)
Definition: edmStreamStallGrapher.py:349
edmStreamStallGrapher.Point.y
y
Definition: edmStreamStallGrapher.py:441
edmStreamStallGrapher.readLogFile
def readLogFile(inputFile)
Definition: edmStreamStallGrapher.py:301
edmStreamStallGrapher.createAsciiImage
def createAsciiImage(processingSteps, numStreams, maxNameSize)
Definition: edmStreamStallGrapher.py:372
edmStreamStallGrapher.Stack.update
def update(self, graphType, points)
Definition: edmStreamStallGrapher.py:481
edmStreamStallGrapher.Point.__init__
def __init__(self, x_, y_)
Definition: edmStreamStallGrapher.py:439
edmStreamStallGrapher.Stack.__init__
def __init__(self)
Definition: edmStreamStallGrapher.py:478
edmStreamStallGrapher.plotPerStreamAboveFirstAndPrepareStack
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
Definition: edmStreamStallGrapher.py:549
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
edmStreamStallGrapher.TracerParser
Definition: edmStreamStallGrapher.py:271
PVValHelper::add
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
Definition: PVValidationHelpers.cc:12
mps_setup.append
append
Definition: mps_setup.py:85
edmStreamStallGrapher.TracerParser.processingSteps
def processingSteps(self)
Definition: edmStreamStallGrapher.py:274
edmStreamStallGrapher.StreamInfoElement
Definition: edmStreamStallGrapher.py:492
createfilelist.int
int
Definition: createfilelist.py:10
edmStreamStallGrapher.StreamInfoElement.delta
delta
Definition: edmStreamStallGrapher.py:495
edmStreamStallGrapher.parseTracerOutput
def parseTracerOutput(f)
Definition: edmStreamStallGrapher.py:209
ComparisonHelper::zip
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
Definition: L1TStage2CaloLayer1.h:41
edmStreamStallGrapher.Stack.data
data
Definition: edmStreamStallGrapher.py:479
edmStreamStallGrapher.mergeContiguousBlocks
def mergeContiguousBlocks(blocks)
Definition: edmStreamStallGrapher.py:528
edmStreamStallGrapher.TracerParser.__init__
def __init__(self, f)
Definition: edmStreamStallGrapher.py:272
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
edmStreamStallGrapher.chooseParser
def chooseParser(inputFile)
Definition: edmStreamStallGrapher.py:278
edmStreamStallGrapher.processingStepsFromStallMonitorOutput
def processingStepsFromStallMonitorOutput(f, moduleNames)
Definition: edmStreamStallGrapher.py:72
edmStreamStallGrapher.Stack
Definition: edmStreamStallGrapher.py:477
format
BeamSpotPI::unpack
std::pair< unsigned int, unsigned int > unpack(cond::Time_t since)
Definition: BeamSpotPayloadInspectorHelper.h:22
edmStreamStallGrapher.adjacentDiff
def adjacentDiff(*pairLists)
Definition: edmStreamStallGrapher.py:466
edmStreamStallGrapher.StreamInfoElement.__init__
def __init__(self, begin_, delta_, color_)
Definition: edmStreamStallGrapher.py:493
edmStreamStallGrapher.printHelp
def printHelp()
Definition: edmStreamStallGrapher.py:10
beamvalidation.exit
def exit(msg="")
Definition: beamvalidation.py:53
edmStreamStallGrapher.reduceSortedPoints
def reduceSortedPoints(ps)
Definition: edmStreamStallGrapher.py:450
edmStreamStallGrapher.findStalledModules
def findStalledModules(processingSteps, numStreams)
Definition: edmStreamStallGrapher.py:316
edmStreamStallGrapher.StallMonitorParser._moduleNames
_moduleNames
Definition: edmStreamStallGrapher.py:164
edmStreamStallGrapher.consolidateContiguousBlocks
def consolidateContiguousBlocks(numStreams, streamInfo)
Definition: edmStreamStallGrapher.py:504