CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 from builtins import range
4 from itertools import groupby
5 from operator import attrgetter,itemgetter
6 import sys
7 from collections import defaultdict
8 #----------------------------------------------
9 def printHelp():
10  s = '''
11 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
12  stream stalls. Use something like this in the configuration:
13 
14  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
15 
16  After running the job, execute this script and pass the name of the
17  StallMonitor log file to the script.
18 
19  By default, the script will then print an 'ASCII art' stall graph
20  which consists of a line of text for each time a module or the
21  source stops or starts. Each line contains the name of the module
22  which either started or stopped running, and the number of modules
23  running on each stream at that moment in time. After that will be
24  the time and stream number. Then if a module just started, you
25  will also see the amount of time the module spent between finishing
26  its prefetching and starting. The state of a module is represented
27  by a symbol:
28 
29  plus ("+") the stream has just finished waiting and is starting a module
30  minus ("-") the stream just finished running a module
31 
32  If a module had to wait more than 0.1 seconds, the end of the line
33  will have "STALLED". Startup actions, e.g. reading conditions,
34  may affect results for the first few events.
35 
36  Using the command line arguments described above you can make the
37  program create a PDF file with actual graphs instead of the 'ASCII art'
38  output.
39 
40  Once the graph is completed, the program outputs the list of modules
41  which had the greatest total stall times. The list is sorted by
42  total stall time and written in descending order. In addition, the
43  list of all stall times for the module is given.
44 
45  There is an inferior alternative (an old obsolete way).
46  Instead of using the StallMonitor Service, you can use the
47  Tracer Service. Make sure to use the 'printTimestamps' option
48  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
49  There are problems associated with this and it is not recommended.'''
50  return s
51 
52 kStallThreshold=100000 #in microseconds
53 kTracerInput=False
54 
55 #Stream states
56 kStarted=0
57 kFinished=1
58 kPrefetchEnd=2
59 kStartedAcquire=3
60 kFinishedAcquire=4
61 kStartedSource=5
62 kFinishedSource=6
63 kStartedSourceDelayedRead=7
64 kFinishedSourceDelayedRead=8
65 
66 #Special names
67 kSourceFindEvent = "sourceFindEvent"
68 kSourceDelayedRead ="sourceDelayedRead"
69 
70 kTimeFuzz = 1000 # in microseconds
71 
72 #----------------------------------------------
73 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
74  for rawl in f:
75  l = rawl.strip()
76  if not l or l[0] == '#':
77  continue
78  (step,payload) = tuple(l.split(None,1))
79  payload=payload.split()
80 
81  # Ignore these
82  if step == 'E' or step == 'e':
83  continue
84 
85  # Payload format is:
86  # <stream id> <..other fields..> <time since begin job>
87  stream = int(payload[0])
88  time = int(payload[-1])
89  trans = None
90  isEvent = True
91 
92  name = None
93  # 'S' = begin of event creation in source
94  # 's' = end of event creation in source
95  if step == 'S' or step == 's':
96  name = kSourceFindEvent
97  trans = kStartedSource
98  # The start of an event is the end of the framework part
99  if step == 's':
100  trans = kFinishedSource
101  else:
102  # moduleID is the second payload argument for all steps below
103  moduleID = payload[1]
104 
105  # 'p' = end of module prefetching
106  # 'M' = begin of module processing
107  # 'm' = end of module processing
108  if step == 'p' or step == 'M' or step == 'm':
109  trans = kStarted
110  if step == 'p':
111  trans = kPrefetchEnd
112  elif step == 'm':
113  trans = kFinished
114  if step == 'm' or step == 'M':
115  isEvent = (int(payload[2]) == 0)
116  name = moduleNames[moduleID]
117 
118  # 'q' = end of esmodule prefetching
119  # 'N' = begin of esmodule processing
120  # 'n' = end of esmodule processing
121  if step == 'q' or step == 'N' or step == 'n':
122  trans = kStarted
123  if step == 'q':
124  trans = kPrefetchEnd
125  elif step == 'n':
126  trans = kFinished
127  if step == 'n' or step == 'N':
128  isEvent = (int(payload[2]) == 0)
129  name = esModuleNames[moduleID]
130 
131  # 'A' = begin of module acquire function
132  # 'a' = end of module acquire function
133  elif step == 'A' or step == 'a':
134  trans = kStartedAcquire
135  if step == 'a':
136  trans = kFinishedAcquire
137  name = moduleNames[moduleID]
138 
139  # Delayed read from source
140  # 'R' = begin of delayed read from source
141  # 'r' = end of delayed read from source
142  elif step == 'R' or step == 'r':
143  trans = kStartedSourceDelayedRead
144  if step == 'r':
145  trans = kFinishedSourceDelayedRead
146  name = kSourceDelayedRead
147 
148  if trans is not None:
149  yield (name,trans,stream,time, isEvent)
150 
151  return
152 
154  def __init__(self,f):
155  numStreams = 0
156  numStreamsFromSource = 0
157  moduleNames = {}
158  esModuleNames = {}
159  for rawl in f:
160  l = rawl.strip()
161  if l and l[0] == 'M':
162  i = l.split(' ')
163  if i[3] == '4':
164  #found global begin run
165  numStreams = int(i[1])+1
166  break
167  if numStreams == 0 and l and l[0] == 'S':
168  s = int(l.split(' ')[1])
169  if s > numStreamsFromSource:
170  numStreamsFromSource = s
171  if len(l) > 5 and l[0:2] == "#M":
172  (id,name)=tuple(l[2:].split())
173  moduleNames[id] = name
174  continue
175  if len(l) > 5 and l[0:2] == "#N":
176  (id,name)=tuple(l[2:].split())
177  esModuleNames[id] = name
178  continue
179 
180  self._f = f
181  if numStreams == 0:
182  numStreams = numStreamsFromSource +2
183  self.numStreams =numStreams
184  self._moduleNames = moduleNames
185  self._esModuleNames = esModuleNames
186  self.maxNameSize =0
187  for n in moduleNames.items():
188  self.maxNameSize = max(self.maxNameSize,len(n))
189  for n in esModuleNames.items():
190  self.maxNameSize = max(self.maxNameSize,len(n))
191  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
192 
193  def processingSteps(self):
194  """Create a generator which can step through the file and return each processing step.
195  Using a generator reduces the memory overhead when parsing a large file.
196  """
197  self._f.seek(0)
199 
200 #----------------------------------------------
201 # Utility to get time out of Tracer output text format
202 def getTime(line):
203  time = line.split(" ")[1]
204  time = time.split(":")
205  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
206  time = int(1000000*time) # convert to microseconds
207  return time
208 
209 #----------------------------------------------
210 # The next function parses the Tracer output.
211 # Here are some differences to consider if you use Tracer output
212 # instead of the StallMonitor output.
213 # - The time in the text of the Tracer output is not as precise
214 # as the StallMonitor (.01 s vs .001 s)
215 # - The MessageLogger bases the time on when the message printed
216 # and not when it was initially queued up to print which smears
217 # the accuracy of the times.
218 # - Both of the previous things can produce some strange effects
219 # in the output plots.
220 # - The file size of the Tracer text file is much larger.
221 # - The CPU work needed to parse the Tracer files is larger.
222 # - The Tracer log file is expected to have "++" in the first
223 # or fifth line. If there are extraneous lines at the beginning
224 # you have to remove them.
225 # - The ascii printout out will have one extraneous line
226 # near the end for the SourceFindEvent start.
227 # - The only advantage I can see is that you have only
228 # one output file to handle instead of two, the regular
229 # log file and the StallMonitor output.
230 # We might should just delete the Tracer option because it is
231 # clearly inferior ...
233  processingSteps = []
234  numStreams = 0
235  maxNameSize = 0
236  startTime = 0
237  streamsThatSawFirstEvent = set()
238  for l in f:
239  trans = None
240  # We estimate the start and stop of the source
241  # by the end of the previous event and start of
242  # the event. This is historical, probably because
243  # the Tracer output for the begin and end of the
244  # source event does not include the stream number.
245  if l.find("processing event :") != -1:
246  name = kSourceFindEvent
247  trans = kStartedSource
248  # the end of the source is estimated using the start of the event
249  if l.find("starting:") != -1:
250  trans = kFinishedSource
251  elif l.find("processing event for module") != -1:
252  trans = kStarted
253  if l.find("finished:") != -1:
254  if l.find("prefetching") != -1:
255  trans = kPrefetchEnd
256  else:
257  trans = kFinished
258  else:
259  if l.find("prefetching") != -1:
260  #skip this since we don't care about prefetch starts
261  continue
262  name = l.split("'")[1]
263  elif l.find("processing event acquire for module:") != -1:
264  trans = kStartedAcquire
265  if l.find("finished:") != -1:
266  trans = kFinishedAcquire
267  name = l.split("'")[1]
268  elif l.find("event delayed read from source") != -1:
269  trans = kStartedSourceDelayedRead
270  if l.find("finished:") != -1:
271  trans = kFinishedSourceDelayedRead
272  name = kSourceDelayedRead
273  if trans is not None:
274  time = getTime(l)
275  if startTime == 0:
276  startTime = time
277  time = time - startTime
278  streamIndex = l.find("stream = ")
279  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
280  maxNameSize = max(maxNameSize, len(name))
281 
282  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
283  # This is wrong but there is no way to estimate the time better
284  # because there is no previous event for the first event.
285  processingSteps.append((name,kStartedSource,stream,time,True))
286  streamsThatSawFirstEvent.add(stream)
287 
288  processingSteps.append((name,trans,stream,time, True))
289  numStreams = max(numStreams, stream+1)
290 
291  f.close()
292  return (processingSteps,numStreams,maxNameSize)
293 
295  def __init__(self,f):
296  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
297  def processingSteps(self):
298  return self._processingSteps
299 
300 #----------------------------------------------
301 def chooseParser(inputFile):
302 
303  firstLine = inputFile.readline().rstrip()
304  for i in range(3):
305  inputFile.readline()
306  # Often the Tracer log file starts with 4 lines not from the Tracer
307  fifthLine = inputFile.readline().rstrip()
308  inputFile.seek(0) # Rewind back to beginning
309  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
310  print("> ... Parsing StallMonitor output.")
311  return StallMonitorParser
312 
313  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
314  global kTracerInput
315  kTracerInput = True
316  print("> ... Parsing Tracer output.")
317  return TracerParser
318  else:
319  inputFile.close()
320  print("Unknown input format.")
321  exit(1)
322 
323 #----------------------------------------------
324 def readLogFile(inputFile):
325  parseInput = chooseParser(inputFile)
326  return parseInput(inputFile)
327 
328 #----------------------------------------------
329 #
330 # modules: The time between prefetch finished and 'start processing' is
331 # the time it took to acquire any resources which is by definition the
332 # stall time.
333 #
334 # source: The source just records how long it spent doing work,
335 # not how long it was stalled. We can get a lower bound on the stall
336 # time for delayed reads by measuring the time the stream was doing
337 # no work up till the start of the source delayed read.
338 #
339 def findStalledModules(processingSteps, numStreams):
340  streamTime = [0]*numStreams
341  streamState = [0]*numStreams
342  stalledModules = {}
343  modulesActiveOnStream = [{} for x in range(numStreams)]
344  for n,trans,s,time,isEvent in processingSteps:
345 
346  waitTime = None
347  modulesOnStream = modulesActiveOnStream[s]
348  if trans == kPrefetchEnd:
349  modulesOnStream[n] = time
350  elif trans == kStarted or trans == kStartedAcquire:
351  if n in modulesOnStream:
352  waitTime = time - modulesOnStream[n]
353  modulesOnStream.pop(n, None)
354  streamState[s] +=1
355  elif trans == kFinished or trans == kFinishedAcquire:
356  streamState[s] -=1
357  streamTime[s] = time
358  elif trans == kStartedSourceDelayedRead:
359  if streamState[s] == 0:
360  waitTime = time - streamTime[s]
361  elif trans == kStartedSource:
362  modulesOnStream.clear()
363  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
364  streamTime[s] = time
365  if waitTime is not None:
366  if waitTime > kStallThreshold:
367  t = stalledModules.setdefault(n,[])
368  t.append(waitTime)
369  return stalledModules
370 
371 
372 def createModuleTiming(processingSteps, numStreams):
373  import json
374  streamTime = [0]*numStreams
375  streamState = [0]*numStreams
376  moduleTimings = defaultdict(list)
377  modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
378  for n,trans,s,time,isEvent in processingSteps:
379  waitTime = None
380  modulesOnStream = modulesActiveOnStream[s]
381  if isEvent:
382  if trans == kStarted:
383  streamState[s] = 1
384  modulesOnStream[n]=time
385  elif trans == kFinished:
386  waitTime = time - modulesOnStream[n]
387  modulesOnStream.pop(n, None)
388  streamState[s] = 0
389  moduleTimings[n].append(float(waitTime/1000.))
390 
391  with open('module-timings.json', 'w') as outfile:
392  outfile.write(json.dumps(moduleTimings, indent=4))
393 
394 #----------------------------------------------
395 def createAsciiImage(processingSteps, numStreams, maxNameSize):
396  streamTime = [0]*numStreams
397  streamState = [0]*numStreams
398  modulesActiveOnStreams = [{} for x in range(numStreams)]
399  for n,trans,s,time,isEvent in processingSteps:
400  waitTime = None
401  modulesActiveOnStream = modulesActiveOnStreams[s]
402  if trans == kPrefetchEnd:
403  modulesActiveOnStream[n] = time
404  continue
405  elif trans == kStartedAcquire or trans == kStarted:
406  if n in modulesActiveOnStream:
407  waitTime = time - modulesActiveOnStream[n]
408  modulesActiveOnStream.pop(n, None)
409  streamState[s] +=1
410  elif trans == kFinishedAcquire or trans == kFinished:
411  streamState[s] -=1
412  streamTime[s] = time
413  elif trans == kStartedSourceDelayedRead:
414  if streamState[s] == 0:
415  waitTime = time - streamTime[s]
416  elif trans == kStartedSource:
417  modulesActiveOnStream.clear()
418  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
419  streamTime[s] = time
420  states = "%-*s: " % (maxNameSize,n)
421  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
422  states +="+ "
423  else:
424  states +="- "
425  for index, state in enumerate(streamState):
426  if n==kSourceFindEvent and index == s:
427  states +="* "
428  else:
429  states +=str(state)+" "
430  states += " -- " + str(time/1000.) + " " + str(s) + " "
431  if waitTime is not None:
432  states += " %.2f"% (waitTime/1000.)
433  if waitTime > kStallThreshold:
434  states += " STALLED"
435 
436  print(states)
437 
438 #----------------------------------------------
439 def printStalledModulesInOrder(stalledModules):
440  priorities = []
441  maxNameSize = 0
442  for name,t in stalledModules.items():
443  maxNameSize = max(maxNameSize, len(name))
444  t.sort(reverse=True)
445  priorities.append((name,sum(t),t))
446 
447  priorities.sort(key=lambda a: a[1], reverse=True)
448 
449  nameColumn = "Stalled Module"
450  maxNameSize = max(maxNameSize, len(nameColumn))
451 
452  stallColumn = "Tot Stall Time"
453  stallColumnLength = len(stallColumn)
454 
455  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
456  for n,s,t in priorities:
457  paddedName = "%-*s:" % (maxNameSize,n)
458  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
459 
460 #--------------------------------------------------------
461 class Point:
462  def __init__(self, x_, y_):
463  self.x = x_
464  self.y = y_
465 
466  def __str__(self):
467  return "(x: {}, y: {})".format(self.x,self.y)
468 
469  def __repr__(self):
470  return self.__str__()
471 
472 #--------------------------------------------------------
474  if len(ps) < 2:
475  return ps
476  reducedPoints = []
477  tmp = Point(ps[0].x, ps[0].y)
478  for p in ps[1:]:
479  if abs(tmp.x -p.x)<kTimeFuzz:
480  tmp.y += p.y
481  else:
482  reducedPoints.append(tmp)
483  tmp = Point(p.x, p.y)
484  reducedPoints.append(tmp)
485  reducedPoints = [p for p in reducedPoints if p.y != 0]
486  return reducedPoints
487 
488 # -------------------------------------------
489 def adjacentDiff(*pairLists):
490  points = []
491  for pairList in pairLists:
492  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
493  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
494  points.sort(key=attrgetter('x'))
495  return points
496 
497 stackType = 'stack'
498 
499 # --------------------------------------------
500 class Stack:
501  def __init__(self):
502  self.data = []
503 
504  def update(self, graphType, points):
505  tmp = points
506  if len(self.data) != 0:
507  tmp += self.data[-1][1]
508 
509  tmp.sort(key=attrgetter('x'))
510  tmp = reduceSortedPoints(tmp)
511  self.data.append((graphType, tmp))
512 
513 #---------------------------------------------
514 # StreamInfoElement
516  def __init__(self, begin_, delta_, color_):
517  self.begin=begin_
518  self.delta=delta_
519  self.color=color_
520 
521  def unpack(self):
522  return self.begin, self.delta, self.color
523 
524 #----------------------------------------------
525 # Consolidating contiguous blocks with the same color
526 # drastically reduces the size of the pdf file.
527 def consolidateContiguousBlocks(numStreams, streamInfo):
528  oldStreamInfo = streamInfo
529  streamInfo = [[] for x in range(numStreams)]
530 
531  for s in range(numStreams):
532  if oldStreamInfo[s]:
533  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
534  for info in oldStreamInfo[s][1:]:
535  start,length,color = info.unpack()
536  if color == lastColor and lastStartTime+lastTimeLength == start:
537  lastTimeLength += length
538  else:
539  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
540  lastStartTime = start
541  lastTimeLength = length
542  lastColor = color
543  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
544 
545  return streamInfo
546 
547 #----------------------------------------------
548 # Consolidating contiguous blocks with the same color drastically
549 # reduces the size of the pdf file. Same functionality as the
550 # previous function, but with slightly different implementation.
552  oldBlocks = blocks
553 
554  blocks = []
555  if not oldBlocks:
556  return blocks
557 
558  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
559  for start,length,height in oldBlocks[1:]:
560  if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
561  lastTimeLength += length
562  else:
563  blocks.append((lastStartTime,lastTimeLength,lastHeight))
564  lastStartTime = start
565  lastTimeLength = length
566  lastHeight = height
567  blocks.append((lastStartTime,lastTimeLength,lastHeight))
568 
569  return blocks
570 
571 #----------------------------------------------
572 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
573  points = sorted(points, key=attrgetter('x'))
574  points = reduceSortedPoints(points)
575  streamHeight = 0
576  preparedTimes = []
577  for t1,t2 in zip(points, points[1:]):
578  streamHeight += t1.y
579  # We make a cut here when plotting because the first row for
580  # each stream was already plotted previously and we do not
581  # need to plot it again. And also we want to count things
582  # properly in allStackTimes. We want to avoid double counting
583  # or missing running modules and this is complicated because
584  # we counted the modules in the first row already.
585  if streamHeight < streamHeightCut:
586  continue
587  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
588  preparedTimes.sort(key=itemgetter(2))
589  preparedTimes = mergeContiguousBlocks(preparedTimes)
590 
591  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
592  theTS = [(t[0],t[1]) for t in ts]
593  if doPlot:
594  theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
595  yspan = (stream-0.4+height,height*(nthreads-1))
596  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
597  if addToStackTimes:
598  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
599 
600 #----------------------------------------------
601 # The same ES module can have multiple Proxies running concurrently
602 # so we need to reference count the names of the active modules
603 class RefCountSet(set):
604  def __init__(self):
605  super().__init__()
606  self.__itemsAndCount = dict()
607  def add(self, item):
608  v = self.__itemsAndCount.setdefault(item,0)
609  self.__itemsAndCount[item]=v+1
610  return super().add(item)
611  def remove(self, item):
612  v = self.__itemsAndCount[item]
613  if v == 1:
614  del self.__itemsAndCount[item]
615  super().remove(item)
616  else:
617  self.__itemsAndCount[item]=v-1
618 
619 
620 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
621 
622  stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
623  streamLowestRow = [[] for x in range(numStreams)]
624  modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
625  acquireActiveOnStreams = [set() for x in range(numStreams)]
626  externalWorkOnStreams = [set() for x in range(numStreams)]
627  previousFinishTime = [None for x in range(numStreams)]
628  streamRunningTimes = [[] for x in range(numStreams)]
629  streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
630  maxNumberOfConcurrentModulesOnAStream = 1
631  externalWorkModulesInJob = False
632  previousTime = [0 for x in range(numStreams)]
633 
634  # The next five variables are only used to check for out of order transitions
635  finishBeforeStart = [set() for x in range(numStreams)]
636  finishAcquireBeforeStart = [set() for x in range(numStreams)]
637  countSource = [0 for x in range(numStreams)]
638  countDelayedSource = [0 for x in range(numStreams)]
639  countExternalWork = [defaultdict(int) for x in range(numStreams)]
640 
641  timeOffset = None
642  for n,trans,s,time,isEvent in processingSteps:
643  if timeOffset is None:
644  timeOffset = time
645  startTime = None
646  time -=timeOffset
647  # force the time to monotonically increase on each stream
648  if time < previousTime[s]:
649  time = previousTime[s]
650  previousTime[s] = time
651 
652  activeModules = modulesActiveOnStreams[s]
653  acquireModules = acquireActiveOnStreams[s]
654  externalWorkModules = externalWorkOnStreams[s]
655 
656  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
657  if checkOrder:
658  # Note that the code which checks the order of transitions assumes that
659  # all the transitions exist in the input. It is checking only for order
660  # problems, usually a start before a finish. Problems are fixed and
661  # silently ignored. Nothing gets plotted for transitions that are
662  # in the wrong order.
663  if trans == kStarted:
664  countExternalWork[s][n] -= 1
665  if n in finishBeforeStart[s]:
666  finishBeforeStart[s].remove(n)
667  continue
668  elif trans == kStartedAcquire:
669  if n in finishAcquireBeforeStart[s]:
670  finishAcquireBeforeStart[s].remove(n)
671  continue
672 
673  if trans == kStartedSourceDelayedRead:
674  countDelayedSource[s] += 1
675  if countDelayedSource[s] < 1:
676  continue
677  elif trans == kStartedSource:
678  countSource[s] += 1
679  if countSource[s] < 1:
680  continue
681 
682  moduleNames = activeModules.copy()
683  moduleNames.update(acquireModules)
684  if trans == kStartedAcquire:
685  acquireModules.add(n)
686  else:
687  activeModules.add(n)
688  streamRunningTimes[s].append(Point(time,1))
689  if moduleNames or externalWorkModules:
690  startTime = previousFinishTime[s]
691  previousFinishTime[s] = time
692 
693  if trans == kStarted and n in externalWorkModules:
694  externalWorkModules.remove(n)
695  streamExternalWorkRunningTimes[s].append(Point(time, -1))
696  else:
697  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
698  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
699  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
700  if checkOrder:
701  if trans == kFinished:
702  if n not in activeModules:
703  finishBeforeStart[s].add(n)
704  continue
705 
706  if trans == kFinishedSourceDelayedRead:
707  countDelayedSource[s] -= 1
708  if countDelayedSource[s] < 0:
709  continue
710  elif trans == kFinishedSource:
711  countSource[s] -= 1
712  if countSource[s] < 0:
713  continue
714 
715  if trans == kFinishedAcquire:
716  if checkOrder:
717  countExternalWork[s][n] += 1
718  if displayExternalWork:
719  externalWorkModulesInJob = True
720  if (not checkOrder) or countExternalWork[s][n] > 0:
721  externalWorkModules.add(n)
722  streamExternalWorkRunningTimes[s].append(Point(time,+1))
723  if checkOrder and n not in acquireModules:
724  finishAcquireBeforeStart[s].add(n)
725  continue
726  streamRunningTimes[s].append(Point(time,-1))
727  startTime = previousFinishTime[s]
728  previousFinishTime[s] = time
729  moduleNames = activeModules.copy()
730  moduleNames.update(acquireModules)
731 
732  if trans == kFinishedAcquire:
733  acquireModules.remove(n)
734  elif trans == kFinishedSourceDelayedRead:
735  if countDelayedSource[s] == 0:
736  activeModules.remove(n)
737  elif trans == kFinishedSource:
738  if countSource[s] == 0:
739  activeModules.remove(n)
740  else:
741  activeModules.remove(n)
742 
743  if startTime is not None:
744  c="green"
745  if not isEvent:
746  c="limegreen"
747  if not moduleNames:
748  c = "darkviolet"
749  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
750  c = "orange"
751  else:
752  for n in moduleNames:
753  if n in stalledModuleNames:
754  c="red"
755  break
756  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
757  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
758 
759  nr = 1
760  if shownStacks and showStreams:
761  nr += 1
762  fig, ax = plt.subplots(nrows=nr, squeeze=True)
763  axStack = None
764  if shownStacks and showStreams:
765  [xH,yH] = fig.get_size_inches()
766  fig.set_size_inches(xH,yH*4/3)
767  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
768  axStack = plt.subplot2grid((4,1),(3,0))
769  if shownStacks and not showStreams:
770  axStack = ax
771 
772  ax.set_xlabel("Time (sec)")
773  ax.set_ylabel("Stream ID")
774  ax.set_ylim(-0.5,numStreams-0.5)
775  ax.yaxis.set_ticks(range(numStreams))
776  if (setXAxis):
777  ax.set_xlim((xLower, xUpper))
778 
779  height = 0.8/maxNumberOfConcurrentModulesOnAStream
780  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
781  for iStream,lowestRow in enumerate(streamLowestRow):
782  times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
783  colors=[x.color for x in lowestRow]
784  # for each stream, plot the lowest row
785  if showStreams:
786  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
787  # record them also for inclusion in the stack plot
788  # the darkviolet ones get counted later so do not count them here
789  for info in lowestRow:
790  if not info.color == 'darkviolet':
791  allStackTimes[info.color].append((info.begin, info.delta))
792 
793  # Now superimpose the number of concurrently running modules on to the graph.
794  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
795 
796  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
797 
798  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
799  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
800 
801  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
802  allStackTimes, ax, i, height,
803  streamHeightCut=2,
804  doPlot=showStreams,
805  addToStackTimes=False,
806  color='darkviolet',
807  threadOffset=1)
808 
809  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
810  allStackTimes, ax, i, height,
811  streamHeightCut=2,
812  doPlot=showStreams,
813  addToStackTimes=True,
814  color='blue',
815  threadOffset=1)
816 
817  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
818  allStackTimes, ax, i, height,
819  streamHeightCut=1,
820  doPlot=False,
821  addToStackTimes=True,
822  color='darkviolet',
823  threadOffset=0)
824 
825  if shownStacks:
826  print("> ... Generating stack")
827  stack = Stack()
828  for color in ['green','limegreen','blue','red','orange','darkviolet']:
829  tmp = allStackTimes[color]
830  tmp = reduceSortedPoints(adjacentDiff(tmp))
831  stack.update(color, tmp)
832 
833  for stk in reversed(stack.data):
834  color = stk[0]
835 
836  # Now arrange list in a manner that it can be grouped by the height of the block
837  height = 0
838  xs = []
839  for p1,p2 in zip(stk[1], stk[1][1:]):
840  height += p1.y
841  xs.append((p1.x, p2.x-p1.x, height))
842  xs.sort(key = itemgetter(2))
843  xs = mergeContiguousBlocks(xs)
844 
845  for height, xpairs in groupby(xs, itemgetter(2)):
846  finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
847  # plot the stacked plot, one color and one height on each call to broken_barh
848  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
849 
850  axStack.set_xlabel("Time (sec)");
851  axStack.set_ylabel("# modules");
852  axStack.set_xlim(ax.get_xlim())
853  axStack.tick_params(top='off')
854 
855  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
856  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
857  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
858  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
859  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
860  if displayExternalWork:
861  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
862  print("> ... Saving to file: '{}'".format(pdfFile))
863  plt.savefig(pdfFile)
864 
865 #=======================================
866 if __name__=="__main__":
867  import argparse
868  import re
869  import sys
870 
871  # Program options
872  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
873  formatter_class=argparse.RawDescriptionHelpFormatter,
874  epilog=printHelp())
875  parser.add_argument('filename',
876  type=argparse.FileType('r'), # open file
877  help='file to process')
878  parser.add_argument('-g', '--graph',
879  nargs='?',
880  metavar="'stall.pdf'",
881  const='stall.pdf',
882  dest='graph',
883  help='''Create pdf file of stream stall graph. If -g is specified
884  by itself, the default file name is \'stall.pdf\'. Otherwise, the
885  argument to the -g option is the filename.''')
886  parser.add_argument('-s', '--stack',
887  action='store_true',
888  help='''Create stack plot, combining all stream-specific info.
889  Can be used only when -g is specified.''')
890  parser.add_argument('--no_streams', action='store_true',
891  help='''Do not show per stream plots.
892  Can be used only when -g and -s are specified.''')
893  parser.add_argument('-e', '--external',
894  action='store_false',
895  help='''Suppress display of external work in graphs.''')
896  parser.add_argument('-o', '--order',
897  action='store_true',
898  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
899  parser.add_argument('-t', '--timings',
900  action='store_true',
901  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
902  parser.add_argument('-l', '--lowerxaxis',
903  type=float,
904  default=0.0,
905  help='''Lower limit of x axis, default 0, not used if upper limit not set''')
906  parser.add_argument('-u', '--upperxaxis',
907  type=float,
908  help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
909  args = parser.parse_args()
910 
911  # Process parsed options
912  inputFile = args.filename
913  pdfFile = args.graph
914  shownStacks = args.stack
915  showStreams = not args.no_streams
916  displayExternalWork = args.external
917  checkOrder = args.order
918  doModuleTimings = False
919  if args.timings:
920  doModuleTimings = True
921 
922  setXAxis = False
923  xUpper = 0.0
924  if args.upperxaxis is not None:
925  setXAxis = True
926  xUpper = args.upperxaxis
927  xLower = args.lowerxaxis
928 
929  doGraphic = False
930  if pdfFile is not None:
931  doGraphic = True
932  import matplotlib
933  # Need to force display since problems with CMSSW matplotlib.
934  matplotlib.use("PDF")
935  import matplotlib.pyplot as plt
936  if not re.match(r'^[\w\.]+$', pdfFile):
937  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
938  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
939  exit(1)
940 
941  if '.' in pdfFile:
942  extension = pdfFile.split('.')[-1]
943  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
944  if not extension in supported_filetypes:
945  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
946  print("The allowed extensions are:")
947  for filetype in supported_filetypes:
948  print(" '.{}'".format(filetype))
949  exit(1)
950 
951  if pdfFile is None and shownStacks:
952  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
953  exit(1)
954  if pdfFile and (not shownStacks and not showStreams):
955  print("When using -g, one must either specify -s OR do not specify --no_streams")
956  exit(1)
957 
958  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
959  reader = readLogFile(inputFile)
960  if kTracerInput:
961  checkOrder = True
962  sys.stderr.write(">processing data\n")
963  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
964 
965 
966  if not doGraphic:
967  sys.stderr.write(">preparing ASCII art\n")
968  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
969  else:
970  sys.stderr.write(">creating PDF\n")
971  createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
972  printStalledModulesInOrder(stalledModules)
973  if doModuleTimings:
974  sys.stderr.write(">creating module-timings.json\n")
975  createModuleTiming(reader.processingSteps(), reader.numStreams)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
std::pair< unsigned int, unsigned int > unpack(cond::Time_t since)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
def createModuleTiming(processingSteps, numStreams)
def processingStepsFromStallMonitorOutput(f, moduleNames, esModuleNames)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
void add(std::map< std::string, TH1 *> &h, TH1 *hist)
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
#define str(s)
def exit(msg="")