CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 from builtins import range
4 from itertools import groupby
5 from operator import attrgetter,itemgetter
6 import sys
7 from collections import defaultdict
8 #----------------------------------------------
9 def printHelp():
10  s = '''
11 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
12  stream stalls. Use something like this in the configuration:
13 
14  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
15 
16  After running the job, execute this script and pass the name of the
17  StallMonitor log file to the script.
18 
19  By default, the script will then print an 'ASCII art' stall graph
20  which consists of a line of text for each time a module or the
21  source stops or starts. Each line contains the name of the module
22  which either started or stopped running, and the number of modules
23  running on each stream at that moment in time. After that will be
24  the time and stream number. Then if a module just started, you
25  will also see the amount of time the module spent between finishing
26  its prefetching and starting. The state of a module is represented
27  by a symbol:
28 
29  plus ("+") the stream has just finished waiting and is starting a module
30  minus ("-") the stream just finished running a module
31 
32  If a module had to wait more than 0.1 seconds, the end of the line
33  will have "STALLED". Startup actions, e.g. reading conditions,
34  may affect results for the first few events.
35 
36  Using the command line arguments described above you can make the
37  program create a PDF file with actual graphs instead of the 'ASCII art'
38  output.
39 
40  Once the graph is completed, the program outputs the list of modules
41  which had the greatest total stall times. The list is sorted by
42  total stall time and written in descending order. In addition, the
43  list of all stall times for the module is given.
44 
45  There is an inferior alternative (an old obsolete way).
46  Instead of using the StallMonitor Service, you can use the
47  Tracer Service. Make sure to use the 'printTimestamps' option
48  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
49  There are problems associated with this and it is not recommended.'''
50  return s
51 
52 kStallThreshold=100000 #in microseconds
53 kTracerInput=False
54 
55 #Stream states
56 kStarted=0
57 kFinished=1
58 kPrefetchEnd=2
59 kStartedAcquire=3
60 kFinishedAcquire=4
61 kStartedSource=5
62 kFinishedSource=6
63 kStartedSourceDelayedRead=7
64 kFinishedSourceDelayedRead=8
65 
66 #Special names
67 kSourceFindEvent = "sourceFindEvent"
68 kSourceDelayedRead ="sourceDelayedRead"
69 
70 kTimeFuzz = 1000 # in microseconds
71 
72 #----------------------------------------------
73 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
74  for rawl in f:
75  l = rawl.strip()
76  if not l or l[0] == '#':
77  continue
78  (step,payload) = tuple(l.split(None,1))
79  payload=payload.split()
80 
81  # Ignore these
82  if step == 'E' or step == 'e' or step == 'F' or step == 'f':
83  continue
84 
85  # Payload format is:
86  # <stream id> <..other fields..> <time since begin job>
87  stream = int(payload[0])
88  time = int(payload[-1])
89  trans = None
90  isEvent = True
91 
92  name = None
93  # 'S' = begin of event creation in source
94  # 's' = end of event creation in source
95  if step == 'S' or step == 's':
96  name = kSourceFindEvent
97  trans = kStartedSource
98  # The start of an event is the end of the framework part
99  if step == 's':
100  trans = kFinishedSource
101  else:
102  # moduleID is the second payload argument for all steps below
103  moduleID = payload[1]
104 
105  # 'p' = end of module prefetching
106  # 'M' = begin of module processing
107  # 'm' = end of module processing
108  if step == 'p' or step == 'M' or step == 'm':
109  trans = kStarted
110  if step == 'p':
111  trans = kPrefetchEnd
112  elif step == 'm':
113  trans = kFinished
114  if step == 'm' or step == 'M':
115  isEvent = (int(payload[2]) == 0)
116  name = moduleNames[moduleID]
117 
118  # 'q' = end of esmodule prefetching
119  # 'N' = begin of esmodule processing
120  # 'n' = end of esmodule processing
121  if step == 'q' or step == 'N' or step == 'n':
122  trans = kStarted
123  if step == 'q':
124  trans = kPrefetchEnd
125  elif step == 'n':
126  trans = kFinished
127  if step == 'n' or step == 'N':
128  isEvent = (int(payload[2]) == 0)
129  name = esModuleNames[moduleID]
130 
131  # 'A' = begin of module acquire function
132  # 'a' = end of module acquire function
133  elif step == 'A' or step == 'a':
134  trans = kStartedAcquire
135  if step == 'a':
136  trans = kFinishedAcquire
137  name = moduleNames[moduleID]
138 
139  # Delayed read from source
140  # 'R' = begin of delayed read from source
141  # 'r' = end of delayed read from source
142  elif step == 'R' or step == 'r':
143  trans = kStartedSourceDelayedRead
144  if step == 'r':
145  trans = kFinishedSourceDelayedRead
146  name = kSourceDelayedRead
147 
148  if trans is not None:
149  yield (name,trans,stream,time, isEvent)
150 
151  return
152 
154  def __init__(self,f):
155  numStreams = 0
156  numStreamsFromSource = 0
157  moduleNames = {}
158  esModuleNames = {}
159  frameworkTransitions = False
160  for rawl in f:
161  l = rawl.strip()
162  if l and l[0] == 'M':
163  i = l.split(' ')
164  if i[3] == '4':
165  #found global begin run
166  numStreams = int(i[1])+1
167  break
168  if numStreams == 0 and l and l[0] == 'S':
169  s = int(l.split(' ')[1])
170  if s > numStreamsFromSource:
171  numStreamsFromSource = s
172  if len(l) > 5 and l[0:2] == "#M":
173  (id,name)=tuple(l[2:].split())
174  moduleNames[id] = name
175  continue
176  if len(l) > 5 and l[0:2] == "#N":
177  (id,name)=tuple(l[2:].split())
178  esModuleNames[id] = name
179  continue
180  if len(l) > 40 and l[0:24] == "# preFrameworkTransition":
181  frameworkTransitions = True
182 
183  self._f = f
184  if numStreams == 0:
185  numStreams = numStreamsFromSource +2
186  self.numStreams =numStreams
187  self._moduleNames = moduleNames
188  self._esModuleNames = esModuleNames
189  self.maxNameSize =0
190  for n in moduleNames.items():
191  self.maxNameSize = max(self.maxNameSize,len(n))
192  for n in esModuleNames.items():
193  self.maxNameSize = max(self.maxNameSize,len(n))
194  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
195  if frameworkTransitions:
196  self.maxNameSize = max(self.maxNameSize, len('streamBeginLumi'))
197 
198  def processingSteps(self):
199  """Create a generator which can step through the file and return each processing step.
200  Using a generator reduces the memory overhead when parsing a large file.
201  """
202  self._f.seek(0)
204 
205 #----------------------------------------------
206 # Utility to get time out of Tracer output text format
207 def getTime(line):
208  time = line.split(" ")[1]
209  time = time.split(":")
210  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
211  time = int(1000000*time) # convert to microseconds
212  return time
213 
214 #----------------------------------------------
215 # The next function parses the Tracer output.
216 # Here are some differences to consider if you use Tracer output
217 # instead of the StallMonitor output.
218 # - The time in the text of the Tracer output is not as precise
219 # as the StallMonitor (.01 s vs .001 s)
220 # - The MessageLogger bases the time on when the message printed
221 # and not when it was initially queued up to print which smears
222 # the accuracy of the times.
223 # - Both of the previous things can produce some strange effects
224 # in the output plots.
225 # - The file size of the Tracer text file is much larger.
226 # - The CPU work needed to parse the Tracer files is larger.
227 # - The Tracer log file is expected to have "++" in the first
228 # or fifth line. If there are extraneous lines at the beginning
229 # you have to remove them.
230 # - The ascii printout out will have one extraneous line
231 # near the end for the SourceFindEvent start.
232 # - The only advantage I can see is that you have only
233 # one output file to handle instead of two, the regular
234 # log file and the StallMonitor output.
235 # We might should just delete the Tracer option because it is
236 # clearly inferior ...
238  processingSteps = []
239  numStreams = 0
240  maxNameSize = 0
241  startTime = 0
242  streamsThatSawFirstEvent = set()
243  for l in f:
244  trans = None
245  # We estimate the start and stop of the source
246  # by the end of the previous event and start of
247  # the event. This is historical, probably because
248  # the Tracer output for the begin and end of the
249  # source event does not include the stream number.
250  if l.find("processing event :") != -1:
251  name = kSourceFindEvent
252  trans = kStartedSource
253  # the end of the source is estimated using the start of the event
254  if l.find("starting:") != -1:
255  trans = kFinishedSource
256  elif l.find("processing event for module") != -1:
257  trans = kStarted
258  if l.find("finished:") != -1:
259  if l.find("prefetching") != -1:
260  trans = kPrefetchEnd
261  else:
262  trans = kFinished
263  else:
264  if l.find("prefetching") != -1:
265  #skip this since we don't care about prefetch starts
266  continue
267  name = l.split("'")[1]
268  elif l.find("processing event acquire for module:") != -1:
269  trans = kStartedAcquire
270  if l.find("finished:") != -1:
271  trans = kFinishedAcquire
272  name = l.split("'")[1]
273  elif l.find("event delayed read from source") != -1:
274  trans = kStartedSourceDelayedRead
275  if l.find("finished:") != -1:
276  trans = kFinishedSourceDelayedRead
277  name = kSourceDelayedRead
278  if trans is not None:
279  time = getTime(l)
280  if startTime == 0:
281  startTime = time
282  time = time - startTime
283  streamIndex = l.find("stream = ")
284  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
285  maxNameSize = max(maxNameSize, len(name))
286 
287  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
288  # This is wrong but there is no way to estimate the time better
289  # because there is no previous event for the first event.
290  processingSteps.append((name,kStartedSource,stream,time,True))
291  streamsThatSawFirstEvent.add(stream)
292 
293  processingSteps.append((name,trans,stream,time, True))
294  numStreams = max(numStreams, stream+1)
295 
296  f.close()
297  return (processingSteps,numStreams,maxNameSize)
298 
300  def __init__(self,f):
301  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
302  def processingSteps(self):
303  return self._processingSteps
304 
305 #----------------------------------------------
306 def chooseParser(inputFile):
307 
308  firstLine = inputFile.readline().rstrip()
309  for i in range(3):
310  inputFile.readline()
311  # Often the Tracer log file starts with 4 lines not from the Tracer
312  fifthLine = inputFile.readline().rstrip()
313  inputFile.seek(0) # Rewind back to beginning
314  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
315  print("> ... Parsing StallMonitor output.")
316  return StallMonitorParser
317 
318  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
319  global kTracerInput
320  kTracerInput = True
321  print("> ... Parsing Tracer output.")
322  return TracerParser
323  else:
324  inputFile.close()
325  print("Unknown input format.")
326  exit(1)
327 
328 #----------------------------------------------
329 def readLogFile(inputFile):
330  parseInput = chooseParser(inputFile)
331  return parseInput(inputFile)
332 
333 #----------------------------------------------
334 #
335 # modules: The time between prefetch finished and 'start processing' is
336 # the time it took to acquire any resources which is by definition the
337 # stall time.
338 #
339 # source: The source just records how long it spent doing work,
340 # not how long it was stalled. We can get a lower bound on the stall
341 # time for delayed reads by measuring the time the stream was doing
342 # no work up till the start of the source delayed read.
343 #
344 def findStalledModules(processingSteps, numStreams):
345  streamTime = [0]*numStreams
346  streamState = [0]*numStreams
347  stalledModules = {}
348  modulesActiveOnStream = [{} for x in range(numStreams)]
349  for n,trans,s,time,isEvent in processingSteps:
350 
351  waitTime = None
352  modulesOnStream = modulesActiveOnStream[s]
353  if trans == kPrefetchEnd:
354  modulesOnStream[n] = time
355  elif trans == kStarted or trans == kStartedAcquire:
356  if n in modulesOnStream:
357  waitTime = time - modulesOnStream[n]
358  modulesOnStream.pop(n, None)
359  streamState[s] +=1
360  elif trans == kFinished or trans == kFinishedAcquire:
361  streamState[s] -=1
362  streamTime[s] = time
363  elif trans == kStartedSourceDelayedRead:
364  if streamState[s] == 0:
365  waitTime = time - streamTime[s]
366  elif trans == kStartedSource:
367  modulesOnStream.clear()
368  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
369  streamTime[s] = time
370  if waitTime is not None:
371  if waitTime > kStallThreshold:
372  t = stalledModules.setdefault(n,[])
373  t.append(waitTime)
374  return stalledModules
375 
376 
377 def createModuleTiming(processingSteps, numStreams):
378  import json
379  streamTime = [0]*numStreams
380  streamState = [0]*numStreams
381  moduleTimings = defaultdict(list)
382  modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
383  for n,trans,s,time,isEvent in processingSteps:
384  waitTime = None
385  modulesOnStream = modulesActiveOnStream[s]
386  if isEvent:
387  if trans == kStarted:
388  streamState[s] = 1
389  modulesOnStream[n]=time
390  elif trans == kFinished:
391  waitTime = time - modulesOnStream[n]
392  modulesOnStream.pop(n, None)
393  streamState[s] = 0
394  moduleTimings[n].append(float(waitTime/1000.))
395 
396  with open('module-timings.json', 'w') as outfile:
397  outfile.write(json.dumps(moduleTimings, indent=4))
398 
399 #----------------------------------------------
400 def createAsciiImage(processingSteps, numStreams, maxNameSize):
401  streamTime = [0]*numStreams
402  streamState = [0]*numStreams
403  modulesActiveOnStreams = [{} for x in range(numStreams)]
404  for n,trans,s,time,isEvent in processingSteps:
405  waitTime = None
406  modulesActiveOnStream = modulesActiveOnStreams[s]
407  if trans == kPrefetchEnd:
408  modulesActiveOnStream[n] = time
409  continue
410  elif trans == kStartedAcquire or trans == kStarted:
411  if n in modulesActiveOnStream:
412  waitTime = time - modulesActiveOnStream[n]
413  modulesActiveOnStream.pop(n, None)
414  streamState[s] +=1
415  elif trans == kFinishedAcquire or trans == kFinished:
416  streamState[s] -=1
417  streamTime[s] = time
418  elif trans == kStartedSourceDelayedRead:
419  if streamState[s] == 0:
420  waitTime = time - streamTime[s]
421  elif trans == kStartedSource:
422  modulesActiveOnStream.clear()
423  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
424  streamTime[s] = time
425  states = "%-*s: " % (maxNameSize,n)
426  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
427  states +="+ "
428  else:
429  states +="- "
430  for index, state in enumerate(streamState):
431  if n==kSourceFindEvent and index == s:
432  states +="* "
433  else:
434  states +=str(state)+" "
435  states += " -- " + str(time/1000.) + " " + str(s) + " "
436  if waitTime is not None:
437  states += " %.2f"% (waitTime/1000.)
438  if waitTime > kStallThreshold:
439  states += " STALLED"
440 
441  print(states)
442 
443 #----------------------------------------------
444 def printStalledModulesInOrder(stalledModules):
445  priorities = []
446  maxNameSize = 0
447  for name,t in stalledModules.items():
448  maxNameSize = max(maxNameSize, len(name))
449  t.sort(reverse=True)
450  priorities.append((name,sum(t),t))
451 
452  priorities.sort(key=lambda a: a[1], reverse=True)
453 
454  nameColumn = "Stalled Module"
455  maxNameSize = max(maxNameSize, len(nameColumn))
456 
457  stallColumn = "Tot Stall Time"
458  stallColumnLength = len(stallColumn)
459 
460  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
461  for n,s,t in priorities:
462  paddedName = "%-*s:" % (maxNameSize,n)
463  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
464 
465 #--------------------------------------------------------
466 class Point:
467  def __init__(self, x_, y_):
468  self.x = x_
469  self.y = y_
470 
471  def __str__(self):
472  return "(x: {}, y: {})".format(self.x,self.y)
473 
474  def __repr__(self):
475  return self.__str__()
476 
477 #--------------------------------------------------------
479  if len(ps) < 2:
480  return ps
481  reducedPoints = []
482  tmp = Point(ps[0].x, ps[0].y)
483  for p in ps[1:]:
484  if abs(tmp.x -p.x)<kTimeFuzz:
485  tmp.y += p.y
486  else:
487  reducedPoints.append(tmp)
488  tmp = Point(p.x, p.y)
489  reducedPoints.append(tmp)
490  reducedPoints = [p for p in reducedPoints if p.y != 0]
491  return reducedPoints
492 
493 # -------------------------------------------
494 def adjacentDiff(*pairLists):
495  points = []
496  for pairList in pairLists:
497  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
498  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
499  points.sort(key=attrgetter('x'))
500  return points
501 
502 stackType = 'stack'
503 
504 # --------------------------------------------
505 class Stack:
506  def __init__(self):
507  self.data = []
508 
509  def update(self, graphType, points):
510  tmp = points
511  if len(self.data) != 0:
512  tmp += self.data[-1][1]
513 
514  tmp.sort(key=attrgetter('x'))
515  tmp = reduceSortedPoints(tmp)
516  self.data.append((graphType, tmp))
517 
518 #---------------------------------------------
519 # StreamInfoElement
521  def __init__(self, begin_, delta_, color_):
522  self.begin=begin_
523  self.delta=delta_
524  self.color=color_
525 
526  def unpack(self):
527  return self.begin, self.delta, self.color
528 
529 #----------------------------------------------
530 # Consolidating contiguous blocks with the same color
531 # drastically reduces the size of the pdf file.
532 def consolidateContiguousBlocks(numStreams, streamInfo):
533  oldStreamInfo = streamInfo
534  streamInfo = [[] for x in range(numStreams)]
535 
536  for s in range(numStreams):
537  if oldStreamInfo[s]:
538  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
539  for info in oldStreamInfo[s][1:]:
540  start,length,color = info.unpack()
541  if color == lastColor and lastStartTime+lastTimeLength == start:
542  lastTimeLength += length
543  else:
544  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
545  lastStartTime = start
546  lastTimeLength = length
547  lastColor = color
548  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
549 
550  return streamInfo
551 
552 #----------------------------------------------
553 # Consolidating contiguous blocks with the same color drastically
554 # reduces the size of the pdf file. Same functionality as the
555 # previous function, but with slightly different implementation.
557  oldBlocks = blocks
558 
559  blocks = []
560  if not oldBlocks:
561  return blocks
562 
563  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
564  for start,length,height in oldBlocks[1:]:
565  if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
566  lastTimeLength += length
567  else:
568  blocks.append((lastStartTime,lastTimeLength,lastHeight))
569  lastStartTime = start
570  lastTimeLength = length
571  lastHeight = height
572  blocks.append((lastStartTime,lastTimeLength,lastHeight))
573 
574  return blocks
575 
576 #----------------------------------------------
577 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
578  points = sorted(points, key=attrgetter('x'))
579  points = reduceSortedPoints(points)
580  streamHeight = 0
581  preparedTimes = []
582  for t1,t2 in zip(points, points[1:]):
583  streamHeight += t1.y
584  # We make a cut here when plotting because the first row for
585  # each stream was already plotted previously and we do not
586  # need to plot it again. And also we want to count things
587  # properly in allStackTimes. We want to avoid double counting
588  # or missing running modules and this is complicated because
589  # we counted the modules in the first row already.
590  if streamHeight < streamHeightCut:
591  continue
592  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
593  preparedTimes.sort(key=itemgetter(2))
594  preparedTimes = mergeContiguousBlocks(preparedTimes)
595 
596  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
597  theTS = [(t[0],t[1]) for t in ts]
598  if doPlot:
599  theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
600  yspan = (stream-0.4+height,height*(nthreads-1))
601  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
602  if addToStackTimes:
603  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
604 
605 #----------------------------------------------
606 # The same ES module can have multiple Proxies running concurrently
607 # so we need to reference count the names of the active modules
608 class RefCountSet(set):
609  def __init__(self):
610  super().__init__()
611  self.__itemsAndCount = dict()
612  def add(self, item):
613  v = self.__itemsAndCount.setdefault(item,0)
614  self.__itemsAndCount[item]=v+1
615  return super().add(item)
616  def remove(self, item):
617  v = self.__itemsAndCount[item]
618  if v == 1:
619  del self.__itemsAndCount[item]
620  super().remove(item)
621  else:
622  self.__itemsAndCount[item]=v-1
623 
624 
625 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
626 
627  stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
628  streamLowestRow = [[] for x in range(numStreams)]
629  modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
630  acquireActiveOnStreams = [set() for x in range(numStreams)]
631  externalWorkOnStreams = [set() for x in range(numStreams)]
632  previousFinishTime = [None for x in range(numStreams)]
633  streamRunningTimes = [[] for x in range(numStreams)]
634  streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
635  maxNumberOfConcurrentModulesOnAStream = 1
636  externalWorkModulesInJob = False
637  previousTime = [0 for x in range(numStreams)]
638 
639  # The next five variables are only used to check for out of order transitions
640  finishBeforeStart = [set() for x in range(numStreams)]
641  finishAcquireBeforeStart = [set() for x in range(numStreams)]
642  countSource = [0 for x in range(numStreams)]
643  countDelayedSource = [0 for x in range(numStreams)]
644  countExternalWork = [defaultdict(int) for x in range(numStreams)]
645 
646  timeOffset = None
647  for n,trans,s,time,isEvent in processingSteps:
648  if timeOffset is None:
649  timeOffset = time
650  startTime = None
651  time -=timeOffset
652  # force the time to monotonically increase on each stream
653  if time < previousTime[s]:
654  time = previousTime[s]
655  previousTime[s] = time
656 
657  activeModules = modulesActiveOnStreams[s]
658  acquireModules = acquireActiveOnStreams[s]
659  externalWorkModules = externalWorkOnStreams[s]
660 
661  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
662  if checkOrder:
663  # Note that the code which checks the order of transitions assumes that
664  # all the transitions exist in the input. It is checking only for order
665  # problems, usually a start before a finish. Problems are fixed and
666  # silently ignored. Nothing gets plotted for transitions that are
667  # in the wrong order.
668  if trans == kStarted:
669  countExternalWork[s][n] -= 1
670  if n in finishBeforeStart[s]:
671  finishBeforeStart[s].remove(n)
672  continue
673  elif trans == kStartedAcquire:
674  if n in finishAcquireBeforeStart[s]:
675  finishAcquireBeforeStart[s].remove(n)
676  continue
677 
678  if trans == kStartedSourceDelayedRead:
679  countDelayedSource[s] += 1
680  if countDelayedSource[s] < 1:
681  continue
682  elif trans == kStartedSource:
683  countSource[s] += 1
684  if countSource[s] < 1:
685  continue
686 
687  moduleNames = activeModules.copy()
688  moduleNames.update(acquireModules)
689  if trans == kStartedAcquire:
690  acquireModules.add(n)
691  else:
692  activeModules.add(n)
693  streamRunningTimes[s].append(Point(time,1))
694  if moduleNames or externalWorkModules:
695  startTime = previousFinishTime[s]
696  previousFinishTime[s] = time
697 
698  if trans == kStarted and n in externalWorkModules:
699  externalWorkModules.remove(n)
700  streamExternalWorkRunningTimes[s].append(Point(time, -1))
701  else:
702  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
703  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
704  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
705  if checkOrder:
706  if trans == kFinished:
707  if n not in activeModules:
708  finishBeforeStart[s].add(n)
709  continue
710 
711  if trans == kFinishedSourceDelayedRead:
712  countDelayedSource[s] -= 1
713  if countDelayedSource[s] < 0:
714  continue
715  elif trans == kFinishedSource:
716  countSource[s] -= 1
717  if countSource[s] < 0:
718  continue
719 
720  if trans == kFinishedAcquire:
721  if checkOrder:
722  countExternalWork[s][n] += 1
723  if displayExternalWork:
724  externalWorkModulesInJob = True
725  if (not checkOrder) or countExternalWork[s][n] > 0:
726  externalWorkModules.add(n)
727  streamExternalWorkRunningTimes[s].append(Point(time,+1))
728  if checkOrder and n not in acquireModules:
729  finishAcquireBeforeStart[s].add(n)
730  continue
731  streamRunningTimes[s].append(Point(time,-1))
732  startTime = previousFinishTime[s]
733  previousFinishTime[s] = time
734  moduleNames = activeModules.copy()
735  moduleNames.update(acquireModules)
736 
737  if trans == kFinishedAcquire:
738  acquireModules.remove(n)
739  elif trans == kFinishedSourceDelayedRead:
740  if countDelayedSource[s] == 0:
741  activeModules.remove(n)
742  elif trans == kFinishedSource:
743  if countSource[s] == 0:
744  activeModules.remove(n)
745  else:
746  activeModules.remove(n)
747 
748  if startTime is not None:
749  c="green"
750  if not isEvent:
751  c="limegreen"
752  if not moduleNames:
753  c = "darkviolet"
754  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
755  c = "orange"
756  else:
757  for n in moduleNames:
758  if n in stalledModuleNames:
759  c="red"
760  break
761  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
762  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
763 
764  nr = 1
765  if shownStacks and showStreams:
766  nr += 1
767  fig, ax = plt.subplots(nrows=nr, squeeze=True)
768  axStack = None
769  if shownStacks and showStreams:
770  [xH,yH] = fig.get_size_inches()
771  fig.set_size_inches(xH,yH*4/3)
772  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
773  axStack = plt.subplot2grid((4,1),(3,0))
774  if shownStacks and not showStreams:
775  axStack = ax
776 
777  ax.set_xlabel("Time (sec)")
778  ax.set_ylabel("Stream ID")
779  ax.set_ylim(-0.5,numStreams-0.5)
780  ax.yaxis.set_ticks(range(numStreams))
781  if (setXAxis):
782  ax.set_xlim((xLower, xUpper))
783 
784  height = 0.8/maxNumberOfConcurrentModulesOnAStream
785  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
786  for iStream,lowestRow in enumerate(streamLowestRow):
787  times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
788  colors=[x.color for x in lowestRow]
789  # for each stream, plot the lowest row
790  if showStreams:
791  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
792  # record them also for inclusion in the stack plot
793  # the darkviolet ones get counted later so do not count them here
794  for info in lowestRow:
795  if not info.color == 'darkviolet':
796  allStackTimes[info.color].append((info.begin, info.delta))
797 
798  # Now superimpose the number of concurrently running modules on to the graph.
799  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
800 
801  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
802 
803  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
804  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
805 
806  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
807  allStackTimes, ax, i, height,
808  streamHeightCut=2,
809  doPlot=showStreams,
810  addToStackTimes=False,
811  color='darkviolet',
812  threadOffset=1)
813 
814  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
815  allStackTimes, ax, i, height,
816  streamHeightCut=2,
817  doPlot=showStreams,
818  addToStackTimes=True,
819  color='blue',
820  threadOffset=1)
821 
822  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
823  allStackTimes, ax, i, height,
824  streamHeightCut=1,
825  doPlot=False,
826  addToStackTimes=True,
827  color='darkviolet',
828  threadOffset=0)
829 
830  if shownStacks:
831  print("> ... Generating stack")
832  stack = Stack()
833  for color in ['green','limegreen','blue','red','orange','darkviolet']:
834  tmp = allStackTimes[color]
835  tmp = reduceSortedPoints(adjacentDiff(tmp))
836  stack.update(color, tmp)
837 
838  for stk in reversed(stack.data):
839  color = stk[0]
840 
841  # Now arrange list in a manner that it can be grouped by the height of the block
842  height = 0
843  xs = []
844  for p1,p2 in zip(stk[1], stk[1][1:]):
845  height += p1.y
846  xs.append((p1.x, p2.x-p1.x, height))
847  xs.sort(key = itemgetter(2))
848  xs = mergeContiguousBlocks(xs)
849 
850  for height, xpairs in groupby(xs, itemgetter(2)):
851  finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
852  # plot the stacked plot, one color and one height on each call to broken_barh
853  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
854 
855  axStack.set_xlabel("Time (sec)");
856  axStack.set_ylabel("# modules");
857  axStack.set_xlim(ax.get_xlim())
858  axStack.tick_params(top='off')
859 
860  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
861  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
862  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
863  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
864  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
865  if displayExternalWork:
866  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
867  print("> ... Saving to file: '{}'".format(pdfFile))
868  plt.savefig(pdfFile)
869 
870 #=======================================
871 if __name__=="__main__":
872  import argparse
873  import re
874  import sys
875 
876  # Program options
877  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
878  formatter_class=argparse.RawDescriptionHelpFormatter,
879  epilog=printHelp())
880  parser.add_argument('filename',
881  type=argparse.FileType('r'), # open file
882  help='file to process')
883  parser.add_argument('-g', '--graph',
884  nargs='?',
885  metavar="'stall.pdf'",
886  const='stall.pdf',
887  dest='graph',
888  help='''Create pdf file of stream stall graph. If -g is specified
889  by itself, the default file name is \'stall.pdf\'. Otherwise, the
890  argument to the -g option is the filename.''')
891  parser.add_argument('-s', '--stack',
892  action='store_true',
893  help='''Create stack plot, combining all stream-specific info.
894  Can be used only when -g is specified.''')
895  parser.add_argument('--no_streams', action='store_true',
896  help='''Do not show per stream plots.
897  Can be used only when -g and -s are specified.''')
898  parser.add_argument('-e', '--external',
899  action='store_false',
900  help='''Suppress display of external work in graphs.''')
901  parser.add_argument('-o', '--order',
902  action='store_true',
903  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
904  parser.add_argument('-t', '--timings',
905  action='store_true',
906  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
907  parser.add_argument('-l', '--lowerxaxis',
908  type=float,
909  default=0.0,
910  help='''Lower limit of x axis, default 0, not used if upper limit not set''')
911  parser.add_argument('-u', '--upperxaxis',
912  type=float,
913  help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
914  args = parser.parse_args()
915 
916  # Process parsed options
917  inputFile = args.filename
918  pdfFile = args.graph
919  shownStacks = args.stack
920  showStreams = not args.no_streams
921  displayExternalWork = args.external
922  checkOrder = args.order
923  doModuleTimings = False
924  if args.timings:
925  doModuleTimings = True
926 
927  setXAxis = False
928  xUpper = 0.0
929  if args.upperxaxis is not None:
930  setXAxis = True
931  xUpper = args.upperxaxis
932  xLower = args.lowerxaxis
933 
934  doGraphic = False
935  if pdfFile is not None:
936  doGraphic = True
937  import matplotlib
938  # Need to force display since problems with CMSSW matplotlib.
939  matplotlib.use("PDF")
940  import matplotlib.pyplot as plt
941  if not re.match(r'^[\w\.]+$', pdfFile):
942  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
943  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
944  exit(1)
945 
946  if '.' in pdfFile:
947  extension = pdfFile.split('.')[-1]
948  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
949  if not extension in supported_filetypes:
950  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
951  print("The allowed extensions are:")
952  for filetype in supported_filetypes:
953  print(" '.{}'".format(filetype))
954  exit(1)
955 
956  if pdfFile is None and shownStacks:
957  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
958  exit(1)
959  if pdfFile and (not shownStacks and not showStreams):
960  print("When using -g, one must either specify -s OR do not specify --no_streams")
961  exit(1)
962 
963  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
964  reader = readLogFile(inputFile)
965  if kTracerInput:
966  checkOrder = True
967  sys.stderr.write(">processing data\n")
968  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
969 
970 
971  if not doGraphic:
972  sys.stderr.write(">preparing ASCII art\n")
973  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
974  else:
975  sys.stderr.write(">creating PDF\n")
976  createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
977  printStalledModulesInOrder(stalledModules)
978  if doModuleTimings:
979  sys.stderr.write(">creating module-timings.json\n")
980  createModuleTiming(reader.processingSteps(), reader.numStreams)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
Definition: TracksSoA.h:90
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
std::pair< unsigned int, unsigned int > unpack(cond::Time_t since)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
def createModuleTiming(processingSteps, numStreams)
def processingStepsFromStallMonitorOutput(f, moduleNames, esModuleNames)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
void add(std::map< std::string, TH1 *> &h, TH1 *hist)
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
#define str(s)
def exit(msg="")