CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 from builtins import range
4 from itertools import groupby
5 from operator import attrgetter,itemgetter
6 import sys
7 from collections import defaultdict
8 #----------------------------------------------
9 def printHelp():
10  s = '''
11 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
12  stream stalls. Use something like this in the configuration:
13 
14  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
15 
16  After running the job, execute this script and pass the name of the
17  StallMonitor log file to the script.
18 
19  By default, the script will then print an 'ASCII art' stall graph
20  which consists of a line of text for each time a module or the
21  source stops or starts. Each line contains the name of the module
22  which either started or stopped running, and the number of modules
23  running on each stream at that moment in time. After that will be
24  the time and stream number. Then if a module just started, you
25  will also see the amount of time the module spent between finishing
26  its prefetching and starting. The state of a module is represented
27  by a symbol:
28 
29  plus ("+") the stream has just finished waiting and is starting a module
30  minus ("-") the stream just finished running a module
31 
32  If a module had to wait more than 0.1 seconds, the end of the line
33  will have "STALLED". Startup actions, e.g. reading conditions,
34  may affect results for the first few events.
35 
36  Using the command line arguments described above you can make the
37  program create a PDF file with actual graphs instead of the 'ASCII art'
38  output.
39 
40  Once the graph is completed, the program outputs the list of modules
41  which had the greatest total stall times. The list is sorted by
42  total stall time and written in descending order. In addition, the
43  list of all stall times for the module is given.
44 
45  There is an inferior alternative (an old obsolete way).
46  Instead of using the StallMonitor Service, you can use the
47  Tracer Service. Make sure to use the 'printTimestamps' option
48  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
49  There are problems associated with this and it is not recommended.'''
50  return s
51 
52 kStallThreshold=100000 #in microseconds
53 kTracerInput=False
54 
55 #Stream states
56 kStarted=0
57 kFinished=1
58 kPrefetchEnd=2
59 kStartedAcquire=3
60 kFinishedAcquire=4
61 kStartedSource=5
62 kFinishedSource=6
63 kStartedSourceDelayedRead=7
64 kFinishedSourceDelayedRead=8
65 
66 #Special names
67 kSourceFindEvent = "sourceFindEvent"
68 kSourceDelayedRead ="sourceDelayedRead"
69 
70 #----------------------------------------------
72  for rawl in f:
73  l = rawl.strip()
74  if not l or l[0] == '#':
75  continue
76  (step,payload) = tuple(l.split(None,1))
77  payload=payload.split()
78 
79  # Ignore these
80  if step == 'E' or step == 'e':
81  continue
82 
83  # Payload format is:
84  # <stream id> <..other fields..> <time since begin job>
85  stream = int(payload[0])
86  time = int(payload[-1])
87  trans = None
88  isEvent = True
89 
90  name = None
91  # 'S' = begin of event creation in source
92  # 's' = end of event creation in source
93  if step == 'S' or step == 's':
94  name = kSourceFindEvent
95  trans = kStartedSource
96  # The start of an event is the end of the framework part
97  if step == 's':
98  trans = kFinishedSource
99  else:
100  # moduleID is the second payload argument for all steps below
101  moduleID = payload[1]
102 
103  # 'p' = end of module prefetching
104  # 'M' = begin of module processing
105  # 'm' = end of module processing
106  if step == 'p' or step == 'M' or step == 'm':
107  trans = kStarted
108  if step == 'p':
109  trans = kPrefetchEnd
110  elif step == 'm':
111  trans = kFinished
112  if step == 'm' or step == 'M':
113  isEvent = (int(payload[2]) == 0)
114  name = moduleNames[moduleID]
115 
116  # 'A' = begin of module acquire function
117  # 'a' = end of module acquire function
118  elif step == 'A' or step == 'a':
119  trans = kStartedAcquire
120  if step == 'a':
121  trans = kFinishedAcquire
122  name = moduleNames[moduleID]
123 
124  # Delayed read from source
125  # 'R' = begin of delayed read from source
126  # 'r' = end of delayed read from source
127  elif step == 'R' or step == 'r':
128  trans = kStartedSourceDelayedRead
129  if step == 'r':
130  trans = kFinishedSourceDelayedRead
131  name = kSourceDelayedRead
132 
133  if trans is not None:
134  yield (name,trans,stream,time, isEvent)
135 
136  return
137 
138 class StallMonitorParser(object):
139  def __init__(self,f):
140  numStreams = 0
141  numStreamsFromSource = 0
142  moduleNames = {}
143  for rawl in f:
144  l = rawl.strip()
145  if l and l[0] == 'M':
146  i = l.split(' ')
147  if i[3] == '4':
148  #found global begin run
149  numStreams = int(i[1])+1
150  break
151  if numStreams == 0 and l and l[0] == 'S':
152  s = int(l.split(' ')[1])
153  if s > numStreamsFromSource:
154  numStreamsFromSource = s
155  if len(l) > 5 and l[0:2] == "#M":
156  (id,name)=tuple(l[2:].split())
157  moduleNames[id] = name
158  continue
159  self._f = f
160  if numStreams == 0:
161  numStreams = numStreamsFromSource +1
162  self.numStreams =numStreams
163  self._moduleNames = moduleNames
164  self.maxNameSize =0
165  for n in moduleNames.items():
166  self.maxNameSize = max(self.maxNameSize,len(n))
167  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
168 
169  def processingSteps(self):
170  """Create a generator which can step through the file and return each processing step.
171  Using a generator reduces the memory overhead when parsing a large file.
172  """
173  self._f.seek(0)
175 
176 #----------------------------------------------
177 # Utility to get time out of Tracer output text format
178 def getTime(line):
179  time = line.split(" ")[1]
180  time = time.split(":")
181  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
182  time = int(1000000*time) # convert to microseconds
183  return time
184 
185 #----------------------------------------------
186 # The next function parses the Tracer output.
187 # Here are some differences to consider if you use Tracer output
188 # instead of the StallMonitor output.
189 # - The time in the text of the Tracer output is not as precise
190 # as the StallMonitor (.01 s vs .001 s)
191 # - The MessageLogger bases the time on when the message printed
192 # and not when it was initially queued up to print which smears
193 # the accuracy of the times.
194 # - Both of the previous things can produce some strange effects
195 # in the output plots.
196 # - The file size of the Tracer text file is much larger.
197 # - The CPU work needed to parse the Tracer files is larger.
198 # - The Tracer log file is expected to have "++" in the first
199 # or fifth line. If there are extraneous lines at the beginning
200 # you have to remove them.
201 # - The ascii printout out will have one extraneous line
202 # near the end for the SourceFindEvent start.
203 # - The only advantage I can see is that you have only
204 # one output file to handle instead of two, the regular
205 # log file and the StallMonitor output.
206 # We might should just delete the Tracer option because it is
207 # clearly inferior ...
209  processingSteps = []
210  numStreams = 0
211  maxNameSize = 0
212  startTime = 0
213  streamsThatSawFirstEvent = set()
214  for l in f:
215  trans = None
216  # We estimate the start and stop of the source
217  # by the end of the previous event and start of
218  # the event. This is historical, probably because
219  # the Tracer output for the begin and end of the
220  # source event does not include the stream number.
221  if l.find("processing event :") != -1:
222  name = kSourceFindEvent
223  trans = kStartedSource
224  # the end of the source is estimated using the start of the event
225  if l.find("starting:") != -1:
226  trans = kFinishedSource
227  elif l.find("processing event for module") != -1:
228  trans = kStarted
229  if l.find("finished:") != -1:
230  if l.find("prefetching") != -1:
231  trans = kPrefetchEnd
232  else:
233  trans = kFinished
234  else:
235  if l.find("prefetching") != -1:
236  #skip this since we don't care about prefetch starts
237  continue
238  name = l.split("'")[1]
239  elif l.find("processing event acquire for module:") != -1:
240  trans = kStartedAcquire
241  if l.find("finished:") != -1:
242  trans = kFinishedAcquire
243  name = l.split("'")[1]
244  elif l.find("event delayed read from source") != -1:
245  trans = kStartedSourceDelayedRead
246  if l.find("finished:") != -1:
247  trans = kFinishedSourceDelayedRead
248  name = kSourceDelayedRead
249  if trans is not None:
250  time = getTime(l)
251  if startTime == 0:
252  startTime = time
253  time = time - startTime
254  streamIndex = l.find("stream = ")
255  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
256  maxNameSize = max(maxNameSize, len(name))
257 
258  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
259  # This is wrong but there is no way to estimate the time better
260  # because there is no previous event for the first event.
261  processingSteps.append((name,kStartedSource,stream,time,True))
262  streamsThatSawFirstEvent.add(stream)
263 
264  processingSteps.append((name,trans,stream,time, True))
265  numStreams = max(numStreams, stream+1)
266 
267  f.close()
268  return (processingSteps,numStreams,maxNameSize)
269 
270 class TracerParser(object):
271  def __init__(self,f):
272  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
273  def processingSteps(self):
274  return self._processingSteps
275 
276 #----------------------------------------------
277 def chooseParser(inputFile):
278 
279  firstLine = inputFile.readline().rstrip()
280  for i in range(3):
281  inputFile.readline()
282  # Often the Tracer log file starts with 4 lines not from the Tracer
283  fifthLine = inputFile.readline().rstrip()
284  inputFile.seek(0) # Rewind back to beginning
285  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
286  print("> ... Parsing StallMonitor output.")
287  return StallMonitorParser
288 
289  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
290  global kTracerInput
291  kTracerInput = True
292  print("> ... Parsing Tracer output.")
293  return TracerParser
294  else:
295  inputFile.close()
296  print("Unknown input format.")
297  exit(1)
298 
299 #----------------------------------------------
300 def readLogFile(inputFile):
301  parseInput = chooseParser(inputFile)
302  return parseInput(inputFile)
303 
304 #----------------------------------------------
305 #
306 # modules: The time between prefetch finished and 'start processing' is
307 # the time it took to acquire any resources which is by definition the
308 # stall time.
309 #
310 # source: The source just records how long it spent doing work,
311 # not how long it was stalled. We can get a lower bound on the stall
312 # time for delayed reads by measuring the time the stream was doing
313 # no work up till the start of the source delayed read.
314 #
315 def findStalledModules(processingSteps, numStreams):
316  streamTime = [0]*numStreams
317  streamState = [0]*numStreams
318  stalledModules = {}
319  modulesActiveOnStream = [{} for x in range(numStreams)]
320  for n,trans,s,time,isEvent in processingSteps:
321 
322  waitTime = None
323  modulesOnStream = modulesActiveOnStream[s]
324  if trans == kPrefetchEnd:
325  modulesOnStream[n] = time
326  elif trans == kStarted or trans == kStartedAcquire:
327  if n in modulesOnStream:
328  waitTime = time - modulesOnStream[n]
329  modulesOnStream.pop(n, None)
330  streamState[s] +=1
331  elif trans == kFinished or trans == kFinishedAcquire:
332  streamState[s] -=1
333  streamTime[s] = time
334  elif trans == kStartedSourceDelayedRead:
335  if streamState[s] == 0:
336  waitTime = time - streamTime[s]
337  elif trans == kStartedSource:
338  modulesOnStream.clear()
339  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
340  streamTime[s] = time
341  if waitTime is not None:
342  if waitTime > kStallThreshold:
343  t = stalledModules.setdefault(n,[])
344  t.append(waitTime)
345  return stalledModules
346 
347 
348 def createModuleTiming(processingSteps, numStreams):
349  import json
350  streamTime = [0]*numStreams
351  streamState = [0]*numStreams
352  moduleTimings = defaultdict(list)
353  modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
354  for n,trans,s,time,isEvent in processingSteps:
355  waitTime = None
356  modulesOnStream = modulesActiveOnStream[s]
357  if isEvent:
358  if trans == kStarted:
359  streamState[s] = 1
360  modulesOnStream[n]=time
361  elif trans == kFinished:
362  waitTime = time - modulesOnStream[n]
363  modulesOnStream.pop(n, None)
364  streamState[s] = 0
365  moduleTimings[n].append(float(waitTime/1000.))
366 
367  with open('module-timings.json', 'w') as outfile:
368  outfile.write(json.dumps(moduleTimings, indent=4))
369 
370 #----------------------------------------------
371 def createAsciiImage(processingSteps, numStreams, maxNameSize):
372  streamTime = [0]*numStreams
373  streamState = [0]*numStreams
374  modulesActiveOnStreams = [{} for x in range(numStreams)]
375  for n,trans,s,time,isEvent in processingSteps:
376  waitTime = None
377  modulesActiveOnStream = modulesActiveOnStreams[s]
378  if trans == kPrefetchEnd:
379  modulesActiveOnStream[n] = time
380  continue
381  elif trans == kStartedAcquire or trans == kStarted:
382  if n in modulesActiveOnStream:
383  waitTime = time - modulesActiveOnStream[n]
384  modulesActiveOnStream.pop(n, None)
385  streamState[s] +=1
386  elif trans == kFinishedAcquire or trans == kFinished:
387  streamState[s] -=1
388  streamTime[s] = time
389  elif trans == kStartedSourceDelayedRead:
390  if streamState[s] == 0:
391  waitTime = time - streamTime[s]
392  elif trans == kStartedSource:
393  modulesActiveOnStream.clear()
394  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
395  streamTime[s] = time
396  states = "%-*s: " % (maxNameSize,n)
397  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
398  states +="+ "
399  else:
400  states +="- "
401  for index, state in enumerate(streamState):
402  if n==kSourceFindEvent and index == s:
403  states +="* "
404  else:
405  states +=str(state)+" "
406  states += " -- " + str(time/1000.) + " " + str(s) + " "
407  if waitTime is not None:
408  states += " %.2f"% (waitTime/1000.)
409  if waitTime > kStallThreshold:
410  states += " STALLED"
411 
412  print(states)
413 
414 #----------------------------------------------
415 def printStalledModulesInOrder(stalledModules):
416  priorities = []
417  maxNameSize = 0
418  for name,t in stalledModules.items():
419  maxNameSize = max(maxNameSize, len(name))
420  t.sort(reverse=True)
421  priorities.append((name,sum(t),t))
422 
423  priorities.sort(key=lambda a: a[1], reverse=True)
424 
425  nameColumn = "Stalled Module"
426  maxNameSize = max(maxNameSize, len(nameColumn))
427 
428  stallColumn = "Tot Stall Time"
429  stallColumnLength = len(stallColumn)
430 
431  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
432  for n,s,t in priorities:
433  paddedName = "%-*s:" % (maxNameSize,n)
434  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
435 
436 #--------------------------------------------------------
437 class Point:
438  def __init__(self, x_, y_):
439  self.x = x_
440  self.y = y_
441 
442  def __str__(self):
443  return "(x: {}, y: {})".format(self.x,self.y)
444 
445  def __repr__(self):
446  return self.__str__()
447 
448 #--------------------------------------------------------
450  if len(ps) < 2:
451  return ps
452  reducedPoints = []
453  tmp = Point(ps[0].x, ps[0].y)
454  for p in ps[1:]:
455  if tmp.x == p.x:
456  tmp.y += p.y
457  else:
458  reducedPoints.append(tmp)
459  tmp = Point(p.x, p.y)
460  reducedPoints.append(tmp)
461  reducedPoints = [p for p in reducedPoints if p.y != 0]
462  return reducedPoints
463 
464 # -------------------------------------------
465 def adjacentDiff(*pairLists):
466  points = []
467  for pairList in pairLists:
468  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
469  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
470  points.sort(key=attrgetter('x'))
471  return points
472 
473 stackType = 'stack'
474 
475 # --------------------------------------------
476 class Stack:
477  def __init__(self):
478  self.data = []
479 
480  def update(self, graphType, points):
481  tmp = points
482  if len(self.data) != 0:
483  tmp += self.data[-1][1]
484 
485  tmp.sort(key=attrgetter('x'))
486  tmp = reduceSortedPoints(tmp)
487  self.data.append((graphType, tmp))
488 
489 #---------------------------------------------
490 # StreamInfoElement
492  def __init__(self, begin_, delta_, color_):
493  self.begin=begin_
494  self.delta=delta_
495  self.color=color_
496 
497  def unpack(self):
498  return self.begin, self.delta, self.color
499 
500 #----------------------------------------------
501 # Consolidating contiguous blocks with the same color
502 # drastically reduces the size of the pdf file.
503 def consolidateContiguousBlocks(numStreams, streamInfo):
504  oldStreamInfo = streamInfo
505  streamInfo = [[] for x in range(numStreams)]
506 
507  for s in range(numStreams):
508  if oldStreamInfo[s]:
509  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
510  for info in oldStreamInfo[s][1:]:
511  start,length,color = info.unpack()
512  if color == lastColor and lastStartTime+lastTimeLength == start:
513  lastTimeLength += length
514  else:
515  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
516  lastStartTime = start
517  lastTimeLength = length
518  lastColor = color
519  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
520 
521  return streamInfo
522 
523 #----------------------------------------------
524 # Consolidating contiguous blocks with the same color drastically
525 # reduces the size of the pdf file. Same functionality as the
526 # previous function, but with slightly different implementation.
528  oldBlocks = blocks
529 
530  blocks = []
531  if not oldBlocks:
532  return blocks
533 
534  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
535  for start,length,height in oldBlocks[1:]:
536  if height == lastHeight and lastStartTime+lastTimeLength == start:
537  lastTimeLength += length
538  else:
539  blocks.append((lastStartTime,lastTimeLength,lastHeight))
540  lastStartTime = start
541  lastTimeLength = length
542  lastHeight = height
543  blocks.append((lastStartTime,lastTimeLength,lastHeight))
544 
545  return blocks
546 
547 #----------------------------------------------
548 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
549  points = sorted(points, key=attrgetter('x'))
550  points = reduceSortedPoints(points)
551  streamHeight = 0
552  preparedTimes = []
553  for t1,t2 in zip(points, points[1:]):
554  streamHeight += t1.y
555  # We make a cut here when plotting because the first row for
556  # each stream was already plotted previously and we do not
557  # need to plot it again. And also we want to count things
558  # properly in allStackTimes. We want to avoid double counting
559  # or missing running modules and this is complicated because
560  # we counted the modules in the first row already.
561  if streamHeight < streamHeightCut:
562  continue
563  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
564  preparedTimes.sort(key=itemgetter(2))
565  preparedTimes = mergeContiguousBlocks(preparedTimes)
566 
567  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
568  theTS = [(t[0],t[1]) for t in ts]
569  if doPlot:
570  theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
571  yspan = (stream-0.4+height,height*(nthreads-1))
572  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
573  if addToStackTimes:
574  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
575 
576 #----------------------------------------------
577 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
578 
579  stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
580  streamLowestRow = [[] for x in range(numStreams)]
581  modulesActiveOnStreams = [set() for x in range(numStreams)]
582  acquireActiveOnStreams = [set() for x in range(numStreams)]
583  externalWorkOnStreams = [set() for x in range(numStreams)]
584  previousFinishTime = [None for x in range(numStreams)]
585  streamRunningTimes = [[] for x in range(numStreams)]
586  streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
587  maxNumberOfConcurrentModulesOnAStream = 1
588  externalWorkModulesInJob = False
589  previousTime = [0 for x in range(numStreams)]
590 
591  # The next five variables are only used to check for out of order transitions
592  finishBeforeStart = [set() for x in range(numStreams)]
593  finishAcquireBeforeStart = [set() for x in range(numStreams)]
594  countSource = [0 for x in range(numStreams)]
595  countDelayedSource = [0 for x in range(numStreams)]
596  countExternalWork = [defaultdict(int) for x in range(numStreams)]
597 
598  timeOffset = None
599  for n,trans,s,time,isEvent in processingSteps:
600  if timeOffset is None:
601  timeOffset = time
602  startTime = None
603  time -=timeOffset
604  # force the time to monotonically increase on each stream
605  if time < previousTime[s]:
606  time = previousTime[s]
607  previousTime[s] = time
608 
609  activeModules = modulesActiveOnStreams[s]
610  acquireModules = acquireActiveOnStreams[s]
611  externalWorkModules = externalWorkOnStreams[s]
612 
613  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
614  if checkOrder:
615  # Note that the code which checks the order of transitions assumes that
616  # all the transitions exist in the input. It is checking only for order
617  # problems, usually a start before a finish. Problems are fixed and
618  # silently ignored. Nothing gets plotted for transitions that are
619  # in the wrong order.
620  if trans == kStarted:
621  countExternalWork[s][n] -= 1
622  if n in finishBeforeStart[s]:
623  finishBeforeStart[s].remove(n)
624  continue
625  elif trans == kStartedAcquire:
626  if n in finishAcquireBeforeStart[s]:
627  finishAcquireBeforeStart[s].remove(n)
628  continue
629 
630  if trans == kStartedSourceDelayedRead:
631  countDelayedSource[s] += 1
632  if countDelayedSource[s] < 1:
633  continue
634  elif trans == kStartedSource:
635  countSource[s] += 1
636  if countSource[s] < 1:
637  continue
638 
639  moduleNames = activeModules.copy()
640  moduleNames.update(acquireModules)
641  if trans == kStartedAcquire:
642  acquireModules.add(n)
643  else:
644  activeModules.add(n)
645  streamRunningTimes[s].append(Point(time,1))
646  if moduleNames or externalWorkModules:
647  startTime = previousFinishTime[s]
648  previousFinishTime[s] = time
649 
650  if trans == kStarted and n in externalWorkModules:
651  externalWorkModules.remove(n)
652  streamExternalWorkRunningTimes[s].append(Point(time, -1))
653  else:
654  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
655  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
656  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
657  if checkOrder:
658  if trans == kFinished:
659  if n not in activeModules:
660  finishBeforeStart[s].add(n)
661  continue
662 
663  if trans == kFinishedSourceDelayedRead:
664  countDelayedSource[s] -= 1
665  if countDelayedSource[s] < 0:
666  continue
667  elif trans == kFinishedSource:
668  countSource[s] -= 1
669  if countSource[s] < 0:
670  continue
671 
672  if trans == kFinishedAcquire:
673  if checkOrder:
674  countExternalWork[s][n] += 1
675  if displayExternalWork:
676  externalWorkModulesInJob = True
677  if (not checkOrder) or countExternalWork[s][n] > 0:
678  externalWorkModules.add(n)
679  streamExternalWorkRunningTimes[s].append(Point(time,+1))
680  if checkOrder and n not in acquireModules:
681  finishAcquireBeforeStart[s].add(n)
682  continue
683  streamRunningTimes[s].append(Point(time,-1))
684  startTime = previousFinishTime[s]
685  previousFinishTime[s] = time
686  moduleNames = activeModules.copy()
687  moduleNames.update(acquireModules)
688 
689  if trans == kFinishedAcquire:
690  acquireModules.remove(n)
691  elif trans == kFinishedSourceDelayedRead:
692  if countDelayedSource[s] == 0:
693  activeModules.remove(n)
694  elif trans == kFinishedSource:
695  if countSource[s] == 0:
696  activeModules.remove(n)
697  else:
698  activeModules.remove(n)
699 
700  if startTime is not None:
701  c="green"
702  if not isEvent:
703  c="limegreen"
704  if not moduleNames:
705  c = "darkviolet"
706  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
707  c = "orange"
708  else:
709  for n in moduleNames:
710  if n in stalledModuleNames:
711  c="red"
712  break
713  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
714  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
715 
716  nr = 1
717  if shownStacks:
718  nr += 1
719  fig, ax = plt.subplots(nrows=nr, squeeze=True)
720  axStack = None
721  if shownStacks:
722  [xH,yH] = fig.get_size_inches()
723  fig.set_size_inches(xH,yH*4/3)
724  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
725  axStack = plt.subplot2grid((4,1),(3,0))
726 
727  ax.set_xlabel("Time (sec)")
728  ax.set_ylabel("Stream ID")
729  ax.set_ylim(-0.5,numStreams-0.5)
730  ax.yaxis.set_ticks(range(numStreams))
731  if (setXAxis):
732  ax.set_xlim((xLower, xUpper))
733 
734  height = 0.8/maxNumberOfConcurrentModulesOnAStream
735  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
736  for iStream,lowestRow in enumerate(streamLowestRow):
737  times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
738  colors=[x.color for x in lowestRow]
739  # for each stream, plot the lowest row
740  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
741  # record them also for inclusion in the stack plot
742  # the darkviolet ones get counted later so do not count them here
743  for info in lowestRow:
744  if not info.color == 'darkviolet':
745  allStackTimes[info.color].append((info.begin, info.delta))
746 
747  # Now superimpose the number of concurrently running modules on to the graph.
748  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
749 
750  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
751 
752  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
753  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
754 
755  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
756  allStackTimes, ax, i, height,
757  streamHeightCut=2,
758  doPlot=True,
759  addToStackTimes=False,
760  color='darkviolet',
761  threadOffset=1)
762 
763  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
764  allStackTimes, ax, i, height,
765  streamHeightCut=2,
766  doPlot=True,
767  addToStackTimes=True,
768  color='blue',
769  threadOffset=1)
770 
771  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
772  allStackTimes, ax, i, height,
773  streamHeightCut=1,
774  doPlot=False,
775  addToStackTimes=True,
776  color='darkviolet',
777  threadOffset=0)
778 
779  if shownStacks:
780  print("> ... Generating stack")
781  stack = Stack()
782  for color in ['green','limegreen','blue','red','orange','darkviolet']:
783  tmp = allStackTimes[color]
784  tmp = reduceSortedPoints(adjacentDiff(tmp))
785  stack.update(color, tmp)
786 
787  for stk in reversed(stack.data):
788  color = stk[0]
789 
790  # Now arrange list in a manner that it can be grouped by the height of the block
791  height = 0
792  xs = []
793  for p1,p2 in zip(stk[1], stk[1][1:]):
794  height += p1.y
795  xs.append((p1.x, p2.x-p1.x, height))
796  xs.sort(key = itemgetter(2))
797  xs = mergeContiguousBlocks(xs)
798 
799  for height, xpairs in groupby(xs, itemgetter(2)):
800  finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
801  # plot the stacked plot, one color and one height on each call to broken_barh
802  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
803 
804  axStack.set_xlabel("Time (sec)");
805  axStack.set_ylabel("# modules");
806  axStack.set_xlim(ax.get_xlim())
807  axStack.tick_params(top='off')
808 
809  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
810  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
811  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
812  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
813  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
814  if displayExternalWork:
815  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
816  print("> ... Saving to file: '{}'".format(pdfFile))
817  plt.savefig(pdfFile)
818 
819 #=======================================
820 if __name__=="__main__":
821  import argparse
822  import re
823  import sys
824 
825  # Program options
826  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
827  formatter_class=argparse.RawDescriptionHelpFormatter,
828  epilog=printHelp())
829  parser.add_argument('filename',
830  type=argparse.FileType('r'), # open file
831  help='file to process')
832  parser.add_argument('-g', '--graph',
833  nargs='?',
834  metavar="'stall.pdf'",
835  const='stall.pdf',
836  dest='graph',
837  help='''Create pdf file of stream stall graph. If -g is specified
838  by itself, the default file name is \'stall.pdf\'. Otherwise, the
839  argument to the -g option is the filename.''')
840  parser.add_argument('-s', '--stack',
841  action='store_true',
842  help='''Create stack plot, combining all stream-specific info.
843  Can be used only when -g is specified.''')
844  parser.add_argument('-e', '--external',
845  action='store_false',
846  help='''Suppress display of external work in graphs.''')
847  parser.add_argument('-o', '--order',
848  action='store_true',
849  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
850  parser.add_argument('-t', '--timings',
851  action='store_true',
852  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
853  parser.add_argument('-l', '--lowerxaxis',
854  type=float,
855  default=0.0,
856  help='''Lower limit of x axis, default 0, not used if upper limit not set''')
857  parser.add_argument('-u', '--upperxaxis',
858  type=float,
859  help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
860  args = parser.parse_args()
861 
862  # Process parsed options
863  inputFile = args.filename
864  pdfFile = args.graph
865  shownStacks = args.stack
866  displayExternalWork = args.external
867  checkOrder = args.order
868  doModuleTimings = False
869  if args.timings:
870  doModuleTimings = True
871 
872  setXAxis = False
873  xUpper = 0.0
874  if args.upperxaxis is not None:
875  setXAxis = True
876  xUpper = args.upperxaxis
877  xLower = args.lowerxaxis
878 
879  doGraphic = False
880  if pdfFile is not None:
881  doGraphic = True
882  import matplotlib
883  # Need to force display since problems with CMSSW matplotlib.
884  matplotlib.use("PDF")
885  import matplotlib.pyplot as plt
886  if not re.match(r'^[\w\.]+$', pdfFile):
887  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
888  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
889  exit(1)
890 
891  if '.' in pdfFile:
892  extension = pdfFile.split('.')[-1]
893  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
894  if not extension in supported_filetypes:
895  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
896  print("The allowed extensions are:")
897  for filetype in supported_filetypes:
898  print(" '.{}'".format(filetype))
899  exit(1)
900 
901  if pdfFile is None and shownStacks:
902  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
903  exit(1)
904 
905  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
906  reader = readLogFile(inputFile)
907  if kTracerInput:
908  checkOrder = True
909  sys.stderr.write(">processing data\n")
910  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
911 
912 
913  if not doGraphic:
914  sys.stderr.write(">preparing ASCII art\n")
915  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
916  else:
917  sys.stderr.write(">creating PDF\n")
918  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
919  printStalledModulesInOrder(stalledModules)
920  if doModuleTimings:
921  sys.stderr.write(">creating module-timings.json\n")
922  createModuleTiming(reader.processingSteps(), reader.numStreams)
std::pair< unsigned int, unsigned int > unpack(cond::Time_t since)
boost::dynamic_bitset append(const boost::dynamic_bitset<> &bs1, const boost::dynamic_bitset<> &bs2)
this method takes two bitsets bs1 and bs2 and returns result of bs2 appended to the end of bs1 ...
const uint16_t range(const Frame &aFrame)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
#define str(s)