CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from builtins import range
4 from itertools import groupby
5 from operator import attrgetter,itemgetter
6 import sys
7 from collections import defaultdict
8 import six
9 
10 #----------------------------------------------
11 def printHelp():
12  s = '''
13 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
14  stream stalls. Use something like this in the configuration:
15 
16  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
17 
18  After running the job, execute this script and pass the name of the
19  StallMonitor log file to the script.
20 
21  By default, the script will then print an 'ASCII art' stall graph
22  which consists of a line of text for each time a module or the
23  source stops or starts. Each line contains the name of the module
24  which either started or stopped running, and the number of modules
25  running on each stream at that moment in time. After that will be
26  the time and stream number. Then if a module just started, you
27  will also see the amount of time the module spent between finishing
28  its prefetching and starting. The state of a module is represented
29  by a symbol:
30 
31  plus ("+") the stream has just finished waiting and is starting a module
32  minus ("-") the stream just finished running a module
33 
34  If a module had to wait more than 0.1 seconds, the end of the line
35  will have "STALLED". Startup actions, e.g. reading conditions,
36  may affect results for the first few events.
37 
38  Using the command line arguments described above you can make the
39  program create a PDF file with actual graphs instead of the 'ASCII art'
40  output.
41 
42  Once the graph is completed, the program outputs the list of modules
43  which had the greatest total stall times. The list is sorted by
44  total stall time and written in descending order. In addition, the
45  list of all stall times for the module is given.
46 
47  There is an inferior alternative (an old obsolete way).
48  Instead of using the StallMonitor Service, you can use the
49  Tracer Service. Make sure to use the 'printTimestamps' option
50  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
51  There are problems associated with this and it is not recommended.'''
52  return s
53 
54 kStallThreshold=100000 #in microseconds
55 kTracerInput=False
56 
57 #Stream states
58 kStarted=0
59 kFinished=1
60 kPrefetchEnd=2
61 kStartedAcquire=3
62 kFinishedAcquire=4
63 kStartedSource=5
64 kFinishedSource=6
65 kStartedSourceDelayedRead=7
66 kFinishedSourceDelayedRead=8
67 
68 #Special names
69 kSourceFindEvent = "sourceFindEvent"
70 kSourceDelayedRead ="sourceDelayedRead"
71 
72 #----------------------------------------------
74  for rawl in f:
75  l = rawl.strip()
76  if not l or l[0] == '#':
77  continue
78  (step,payload) = tuple(l.split(None,1))
79  payload=payload.split()
80 
81  # Ignore these
82  if step == 'E' or step == 'e':
83  continue
84 
85  # Payload format is:
86  # <stream id> <..other fields..> <time since begin job>
87  stream = int(payload[0])
88  time = int(payload[-1])
89  trans = None
90  isEvent = True
91 
92  name = None
93  # 'S' = begin of event creation in source
94  # 's' = end of event creation in source
95  if step == 'S' or step == 's':
96  name = kSourceFindEvent
97  trans = kStartedSource
98  # The start of an event is the end of the framework part
99  if step == 's':
100  trans = kFinishedSource
101  else:
102  # moduleID is the second payload argument for all steps below
103  moduleID = payload[1]
104 
105  # 'p' = end of module prefetching
106  # 'M' = begin of module processing
107  # 'm' = end of module processing
108  if step == 'p' or step == 'M' or step == 'm':
109  trans = kStarted
110  if step == 'p':
111  trans = kPrefetchEnd
112  elif step == 'm':
113  trans = kFinished
114  if step == 'm' or step == 'M':
115  isEvent = (int(payload[2]) == 0)
116  name = moduleNames[moduleID]
117 
118  # 'A' = begin of module acquire function
119  # 'a' = end of module acquire function
120  elif step == 'A' or step == 'a':
121  trans = kStartedAcquire
122  if step == 'a':
123  trans = kFinishedAcquire
124  name = moduleNames[moduleID]
125 
126  # Delayed read from source
127  # 'R' = begin of delayed read from source
128  # 'r' = end of delayed read from source
129  elif step == 'R' or step == 'r':
130  trans = kStartedSourceDelayedRead
131  if step == 'r':
132  trans = kFinishedSourceDelayedRead
133  name = kSourceDelayedRead
134 
135  if trans is not None:
136  yield (name,trans,stream,time, isEvent)
137 
138  return
139 
141  def __init__(self,f):
142  numStreams = 0
143  numStreamsFromSource = 0
144  moduleNames = {}
145  for rawl in f:
146  l = rawl.strip()
147  if l and l[0] == 'M':
148  i = l.split(' ')
149  if i[3] == '4':
150  #found global begin run
151  numStreams = int(i[1])+1
152  break
153  if numStreams == 0 and l and l[0] == 'S':
154  s = int(l.split(' ')[1])
155  if s > numStreamsFromSource:
156  numStreamsFromSource = s
157  if len(l) > 5 and l[0:2] == "#M":
158  (id,name)=tuple(l[2:].split())
159  moduleNames[id] = name
160  continue
161  self._f = f
162  if numStreams == 0:
163  numStreams = numStreamsFromSource +1
164  self.numStreams =numStreams
165  self._moduleNames = moduleNames
166  self.maxNameSize =0
167  for n in six.iteritems(moduleNames):
168  self.maxNameSize = max(self.maxNameSize,len(n))
169  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
170 
171  def processingSteps(self):
172  """Create a generator which can step through the file and return each processing step.
173  Using a generator reduces the memory overhead when parsing a large file.
174  """
175  self._f.seek(0)
177 
178 #----------------------------------------------
179 # Utility to get time out of Tracer output text format
180 def getTime(line):
181  time = line.split(" ")[1]
182  time = time.split(":")
183  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
184  time = int(1000000*time) # convert to microseconds
185  return time
186 
187 #----------------------------------------------
188 # The next function parses the Tracer output.
189 # Here are some differences to consider if you use Tracer output
190 # instead of the StallMonitor output.
191 # - The time in the text of the Tracer output is not as precise
192 # as the StallMonitor (.01 s vs .001 s)
193 # - The MessageLogger bases the time on when the message printed
194 # and not when it was initially queued up to print which smears
195 # the accuracy of the times.
196 # - Both of the previous things can produce some strange effects
197 # in the output plots.
198 # - The file size of the Tracer text file is much larger.
199 # - The CPU work needed to parse the Tracer files is larger.
200 # - The Tracer log file is expected to have "++" in the first
201 # or fifth line. If there are extraneous lines at the beginning
202 # you have to remove them.
203 # - The ascii printout out will have one extraneous line
204 # near the end for the SourceFindEvent start.
205 # - The only advantage I can see is that you have only
206 # one output file to handle instead of two, the regular
207 # log file and the StallMonitor output.
208 # We might should just delete the Tracer option because it is
209 # clearly inferior ...
211  processingSteps = []
212  numStreams = 0
213  maxNameSize = 0
214  startTime = 0
215  streamsThatSawFirstEvent = set()
216  for l in f:
217  trans = None
218  # We estimate the start and stop of the source
219  # by the end of the previous event and start of
220  # the event. This is historical, probably because
221  # the Tracer output for the begin and end of the
222  # source event does not include the stream number.
223  if l.find("processing event :") != -1:
224  name = kSourceFindEvent
225  trans = kStartedSource
226  # the end of the source is estimated using the start of the event
227  if l.find("starting:") != -1:
228  trans = kFinishedSource
229  elif l.find("processing event for module") != -1:
230  trans = kStarted
231  if l.find("finished:") != -1:
232  if l.find("prefetching") != -1:
233  trans = kPrefetchEnd
234  else:
235  trans = kFinished
236  else:
237  if l.find("prefetching") != -1:
238  #skip this since we don't care about prefetch starts
239  continue
240  name = l.split("'")[1]
241  elif l.find("processing event acquire for module:") != -1:
242  trans = kStartedAcquire
243  if l.find("finished:") != -1:
244  trans = kFinishedAcquire
245  name = l.split("'")[1]
246  elif l.find("event delayed read from source") != -1:
247  trans = kStartedSourceDelayedRead
248  if l.find("finished:") != -1:
249  trans = kFinishedSourceDelayedRead
250  name = kSourceDelayedRead
251  if trans is not None:
252  time = getTime(l)
253  if startTime == 0:
254  startTime = time
255  time = time - startTime
256  streamIndex = l.find("stream = ")
257  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
258  maxNameSize = max(maxNameSize, len(name))
259 
260  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
261  # This is wrong but there is no way to estimate the time better
262  # because there is no previous event for the first event.
263  processingSteps.append((name,kStartedSource,stream,time,True))
264  streamsThatSawFirstEvent.add(stream)
265 
266  processingSteps.append((name,trans,stream,time, True))
267  numStreams = max(numStreams, stream+1)
268 
269  f.close()
270  return (processingSteps,numStreams,maxNameSize)
271 
273  def __init__(self,f):
274  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
275  def processingSteps(self):
276  return self._processingSteps
277 
278 #----------------------------------------------
279 def chooseParser(inputFile):
280 
281  firstLine = inputFile.readline().rstrip()
282  for i in range(3):
283  inputFile.readline()
284  # Often the Tracer log file starts with 4 lines not from the Tracer
285  fifthLine = inputFile.readline().rstrip()
286  inputFile.seek(0) # Rewind back to beginning
287  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
288  print("> ... Parsing StallMonitor output.")
289  return StallMonitorParser
290 
291  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
292  global kTracerInput
293  kTracerInput = True
294  print("> ... Parsing Tracer output.")
295  return TracerParser
296  else:
297  inputFile.close()
298  print("Unknown input format.")
299  exit(1)
300 
301 #----------------------------------------------
302 def readLogFile(inputFile):
303  parseInput = chooseParser(inputFile)
304  return parseInput(inputFile)
305 
306 #----------------------------------------------
307 #
308 # modules: The time between prefetch finished and 'start processing' is
309 # the time it took to acquire any resources which is by definition the
310 # stall time.
311 #
312 # source: The source just records how long it spent doing work,
313 # not how long it was stalled. We can get a lower bound on the stall
314 # time for delayed reads by measuring the time the stream was doing
315 # no work up till the start of the source delayed read.
316 #
317 def findStalledModules(processingSteps, numStreams):
318  streamTime = [0]*numStreams
319  streamState = [0]*numStreams
320  stalledModules = {}
321  modulesActiveOnStream = [{} for x in range(numStreams)]
322  for n,trans,s,time,isEvent in processingSteps:
323 
324  waitTime = None
325  modulesOnStream = modulesActiveOnStream[s]
326  if trans == kPrefetchEnd:
327  modulesOnStream[n] = time
328  elif trans == kStarted or trans == kStartedAcquire:
329  if n in modulesOnStream:
330  waitTime = time - modulesOnStream[n]
331  modulesOnStream.pop(n, None)
332  streamState[s] +=1
333  elif trans == kFinished or trans == kFinishedAcquire:
334  streamState[s] -=1
335  streamTime[s] = time
336  elif trans == kStartedSourceDelayedRead:
337  if streamState[s] == 0:
338  waitTime = time - streamTime[s]
339  elif trans == kStartedSource:
340  modulesOnStream.clear()
341  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
342  streamTime[s] = time
343  if waitTime is not None:
344  if waitTime > kStallThreshold:
345  t = stalledModules.setdefault(n,[])
346  t.append(waitTime)
347  return stalledModules
348 
349 
350 def createModuleTiming(processingSteps, numStreams):
351  import json
352  streamTime = [0]*numStreams
353  streamState = [0]*numStreams
354  moduleTimings = defaultdict(list)
355  modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
356  for n,trans,s,time,isEvent in processingSteps:
357  waitTime = None
358  modulesOnStream = modulesActiveOnStream[s]
359  if isEvent:
360  if trans == kStarted:
361  streamState[s] = 1
362  modulesOnStream[n]=time
363  elif trans == kFinished:
364  waitTime = time - modulesOnStream[n]
365  modulesOnStream.pop(n, None)
366  streamState[s] = 0
367  moduleTimings[n].append(float(waitTime/1000.))
368 
369  with open('module-timings.json', 'w') as outfile:
370  outfile.write(json.dumps(moduleTimings, indent=4))
371 
372 #----------------------------------------------
373 def createAsciiImage(processingSteps, numStreams, maxNameSize):
374  streamTime = [0]*numStreams
375  streamState = [0]*numStreams
376  modulesActiveOnStreams = [{} for x in range(numStreams)]
377  for n,trans,s,time,isEvent in processingSteps:
378  waitTime = None
379  modulesActiveOnStream = modulesActiveOnStreams[s]
380  if trans == kPrefetchEnd:
381  modulesActiveOnStream[n] = time
382  continue
383  elif trans == kStartedAcquire or trans == kStarted:
384  if n in modulesActiveOnStream:
385  waitTime = time - modulesActiveOnStream[n]
386  modulesActiveOnStream.pop(n, None)
387  streamState[s] +=1
388  elif trans == kFinishedAcquire or trans == kFinished:
389  streamState[s] -=1
390  streamTime[s] = time
391  elif trans == kStartedSourceDelayedRead:
392  if streamState[s] == 0:
393  waitTime = time - streamTime[s]
394  elif trans == kStartedSource:
395  modulesActiveOnStream.clear()
396  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
397  streamTime[s] = time
398  states = "%-*s: " % (maxNameSize,n)
399  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
400  states +="+ "
401  else:
402  states +="- "
403  for index, state in enumerate(streamState):
404  if n==kSourceFindEvent and index == s:
405  states +="* "
406  else:
407  states +=str(state)+" "
408  states += " -- " + str(time/1000.) + " " + str(s) + " "
409  if waitTime is not None:
410  states += " %.2f"% (waitTime/1000.)
411  if waitTime > kStallThreshold:
412  states += " STALLED"
413 
414  print(states)
415 
416 #----------------------------------------------
417 def printStalledModulesInOrder(stalledModules):
418  priorities = []
419  maxNameSize = 0
420  for name,t in six.iteritems(stalledModules):
421  maxNameSize = max(maxNameSize, len(name))
422  t.sort(reverse=True)
423  priorities.append((name,sum(t),t))
424 
425  def sumSort(i,j):
426  return cmp(i[1],j[1])
427  priorities.sort(cmp=sumSort, reverse=True)
428 
429  nameColumn = "Stalled Module"
430  maxNameSize = max(maxNameSize, len(nameColumn))
431 
432  stallColumn = "Tot Stall Time"
433  stallColumnLength = len(stallColumn)
434 
435  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
436  for n,s,t in priorities:
437  paddedName = "%-*s:" % (maxNameSize,n)
438  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
439 
440 #--------------------------------------------------------
441 class Point:
442  def __init__(self, x_, y_):
443  self.x = x_
444  self.y = y_
445 
446  def __str__(self):
447  return "(x: {}, y: {})".format(self.x,self.y)
448 
449  def __repr__(self):
450  return self.__str__()
451 
452 #--------------------------------------------------------
454  if len(ps) < 2:
455  return ps
456  reducedPoints = []
457  tmp = Point(ps[0].x, ps[0].y)
458  for p in ps[1:]:
459  if tmp.x == p.x:
460  tmp.y += p.y
461  else:
462  reducedPoints.append(tmp)
463  tmp = Point(p.x, p.y)
464  reducedPoints.append(tmp)
465  reducedPoints = [p for p in reducedPoints if p.y != 0]
466  return reducedPoints
467 
468 # -------------------------------------------
469 def adjacentDiff(*pairLists):
470  points = []
471  for pairList in pairLists:
472  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
473  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
474  points.sort(key=attrgetter('x'))
475  return points
476 
477 stackType = 'stack'
478 
479 # --------------------------------------------
480 class Stack:
481  def __init__(self):
482  self.data = []
483 
484  def update(self, graphType, points):
485  tmp = points
486  if len(self.data) != 0:
487  tmp += self.data[-1][1]
488 
489  tmp.sort(key=attrgetter('x'))
490  tmp = reduceSortedPoints(tmp)
491  self.data.append((graphType, tmp))
492 
493 #---------------------------------------------
494 # StreamInfoElement
496  def __init__(self, begin_, delta_, color_):
497  self.begin=begin_
498  self.delta=delta_
499  self.color=color_
500 
501  def unpack(self):
502  return self.begin, self.delta, self.color
503 
504 #----------------------------------------------
505 # Consolidating contiguous blocks with the same color
506 # drastically reduces the size of the pdf file.
507 def consolidateContiguousBlocks(numStreams, streamInfo):
508  oldStreamInfo = streamInfo
509  streamInfo = [[] for x in range(numStreams)]
510 
511  for s in range(numStreams):
512  if oldStreamInfo[s]:
513  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
514  for info in oldStreamInfo[s][1:]:
515  start,length,color = info.unpack()
516  if color == lastColor and lastStartTime+lastTimeLength == start:
517  lastTimeLength += length
518  else:
519  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
520  lastStartTime = start
521  lastTimeLength = length
522  lastColor = color
523  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
524 
525  return streamInfo
526 
527 #----------------------------------------------
528 # Consolidating contiguous blocks with the same color drastically
529 # reduces the size of the pdf file. Same functionality as the
530 # previous function, but with slightly different implementation.
532  oldBlocks = blocks
533 
534  blocks = []
535  if not oldBlocks:
536  return blocks
537 
538  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
539  for start,length,height in oldBlocks[1:]:
540  if height == lastHeight and lastStartTime+lastTimeLength == start:
541  lastTimeLength += length
542  else:
543  blocks.append((lastStartTime,lastTimeLength,lastHeight))
544  lastStartTime = start
545  lastTimeLength = length
546  lastHeight = height
547  blocks.append((lastStartTime,lastTimeLength,lastHeight))
548 
549  return blocks
550 
551 #----------------------------------------------
552 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
553  points = sorted(points, key=attrgetter('x'))
554  points = reduceSortedPoints(points)
555  streamHeight = 0
556  preparedTimes = []
557  for t1,t2 in zip(points, points[1:]):
558  streamHeight += t1.y
559  # We make a cut here when plotting because the first row for
560  # each stream was already plotted previously and we do not
561  # need to plot it again. And also we want to count things
562  # properly in allStackTimes. We want to avoid double counting
563  # or missing running modules and this is complicated because
564  # we counted the modules in the first row already.
565  if streamHeight < streamHeightCut:
566  continue
567  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
568  preparedTimes.sort(key=itemgetter(2))
569  preparedTimes = mergeContiguousBlocks(preparedTimes)
570 
571  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
572  theTS = [(t[0],t[1]) for t in ts]
573  if doPlot:
574  theTimes = [(t[0]/1000.,t[1]/1000.) for t in theTS]
575  yspan = (stream-0.4+height,height*(nthreads-1))
576  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
577  if addToStackTimes:
578  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
579 
580 #----------------------------------------------
581 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
582 
583  stalledModuleNames = set([x for x in stalledModuleInfo.iterkeys()])
584  streamLowestRow = [[] for x in range(numStreams)]
585  modulesActiveOnStreams = [set() for x in range(numStreams)]
586  acquireActiveOnStreams = [set() for x in range(numStreams)]
587  externalWorkOnStreams = [set() for x in range(numStreams)]
588  previousFinishTime = [None for x in range(numStreams)]
589  streamRunningTimes = [[] for x in range(numStreams)]
590  streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
591  maxNumberOfConcurrentModulesOnAStream = 1
592  externalWorkModulesInJob = False
593  previousTime = [0 for x in range(numStreams)]
594 
595  # The next five variables are only used to check for out of order transitions
596  finishBeforeStart = [set() for x in range(numStreams)]
597  finishAcquireBeforeStart = [set() for x in range(numStreams)]
598  countSource = [0 for x in range(numStreams)]
599  countDelayedSource = [0 for x in range(numStreams)]
600  countExternalWork = [defaultdict(int) for x in range(numStreams)]
601 
602  timeOffset = None
603  for n,trans,s,time,isEvent in processingSteps:
604  if timeOffset is None:
605  timeOffset = time
606  startTime = None
607  time -=timeOffset
608  # force the time to monotonically increase on each stream
609  if time < previousTime[s]:
610  time = previousTime[s]
611  previousTime[s] = time
612 
613  activeModules = modulesActiveOnStreams[s]
614  acquireModules = acquireActiveOnStreams[s]
615  externalWorkModules = externalWorkOnStreams[s]
616 
617  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
618  if checkOrder:
619  # Note that the code which checks the order of transitions assumes that
620  # all the transitions exist in the input. It is checking only for order
621  # problems, usually a start before a finish. Problems are fixed and
622  # silently ignored. Nothing gets plotted for transitions that are
623  # in the wrong order.
624  if trans == kStarted:
625  countExternalWork[s][n] -= 1
626  if n in finishBeforeStart[s]:
627  finishBeforeStart[s].remove(n)
628  continue
629  elif trans == kStartedAcquire:
630  if n in finishAcquireBeforeStart[s]:
631  finishAcquireBeforeStart[s].remove(n)
632  continue
633 
634  if trans == kStartedSourceDelayedRead:
635  countDelayedSource[s] += 1
636  if countDelayedSource[s] < 1:
637  continue
638  elif trans == kStartedSource:
639  countSource[s] += 1
640  if countSource[s] < 1:
641  continue
642 
643  moduleNames = activeModules.copy()
644  moduleNames.update(acquireModules)
645  if trans == kStartedAcquire:
646  acquireModules.add(n)
647  else:
648  activeModules.add(n)
649  streamRunningTimes[s].append(Point(time,1))
650  if moduleNames or externalWorkModules:
651  startTime = previousFinishTime[s]
652  previousFinishTime[s] = time
653 
654  if trans == kStarted and n in externalWorkModules:
655  externalWorkModules.remove(n)
656  streamExternalWorkRunningTimes[s].append(Point(time, -1))
657  else:
658  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
659  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
660  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
661  if checkOrder:
662  if trans == kFinished:
663  if n not in activeModules:
664  finishBeforeStart[s].add(n)
665  continue
666 
667  if trans == kFinishedSourceDelayedRead:
668  countDelayedSource[s] -= 1
669  if countDelayedSource[s] < 0:
670  continue
671  elif trans == kFinishedSource:
672  countSource[s] -= 1
673  if countSource[s] < 0:
674  continue
675 
676  if trans == kFinishedAcquire:
677  if checkOrder:
678  countExternalWork[s][n] += 1
679  if displayExternalWork:
680  externalWorkModulesInJob = True
681  if (not checkOrder) or countExternalWork[s][n] > 0:
682  externalWorkModules.add(n)
683  streamExternalWorkRunningTimes[s].append(Point(time,+1))
684  if checkOrder and n not in acquireModules:
685  finishAcquireBeforeStart[s].add(n)
686  continue
687  streamRunningTimes[s].append(Point(time,-1))
688  startTime = previousFinishTime[s]
689  previousFinishTime[s] = time
690  moduleNames = activeModules.copy()
691  moduleNames.update(acquireModules)
692 
693  if trans == kFinishedAcquire:
694  acquireModules.remove(n)
695  elif trans == kFinishedSourceDelayedRead:
696  if countDelayedSource[s] == 0:
697  activeModules.remove(n)
698  elif trans == kFinishedSource:
699  if countSource[s] == 0:
700  activeModules.remove(n)
701  else:
702  activeModules.remove(n)
703 
704  if startTime is not None:
705  c="green"
706  if not isEvent:
707  c="limegreen"
708  if not moduleNames:
709  c = "darkviolet"
710  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
711  c = "orange"
712  else:
713  for n in moduleNames:
714  if n in stalledModuleNames:
715  c="red"
716  break
717  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
718  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
719 
720  nr = 1
721  if shownStacks:
722  nr += 1
723  fig, ax = plt.subplots(nrows=nr, squeeze=True)
724  axStack = None
725  if shownStacks:
726  [xH,yH] = fig.get_size_inches()
727  fig.set_size_inches(xH,yH*4/3)
728  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
729  axStack = plt.subplot2grid((4,1),(3,0))
730 
731  ax.set_xlabel("Time (sec)")
732  ax.set_ylabel("Stream ID")
733  ax.set_ylim(-0.5,numStreams-0.5)
734  ax.yaxis.set_ticks(range(numStreams))
735 
736  height = 0.8/maxNumberOfConcurrentModulesOnAStream
737  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
738  for iStream,lowestRow in enumerate(streamLowestRow):
739  times=[(x.begin/1000., x.delta/1000.) for x in lowestRow] # Scale from msec to sec.
740  colors=[x.color for x in lowestRow]
741  # for each stream, plot the lowest row
742  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
743  # record them also for inclusion in the stack plot
744  # the darkviolet ones get counted later so do not count them here
745  for info in lowestRow:
746  if not info.color == 'darkviolet':
747  allStackTimes[info.color].append((info.begin, info.delta))
748 
749  # Now superimpose the number of concurrently running modules on to the graph.
750  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
751 
752  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
753 
754  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
755  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
756 
757  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
758  allStackTimes, ax, i, height,
759  streamHeightCut=2,
760  doPlot=True,
761  addToStackTimes=False,
762  color='darkviolet',
763  threadOffset=1)
764 
765  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
766  allStackTimes, ax, i, height,
767  streamHeightCut=2,
768  doPlot=True,
769  addToStackTimes=True,
770  color='blue',
771  threadOffset=1)
772 
773  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
774  allStackTimes, ax, i, height,
775  streamHeightCut=1,
776  doPlot=False,
777  addToStackTimes=True,
778  color='darkviolet',
779  threadOffset=0)
780 
781  if shownStacks:
782  print("> ... Generating stack")
783  stack = Stack()
784  for color in ['green','limegreen','blue','red','orange','darkviolet']:
785  tmp = allStackTimes[color]
786  tmp = reduceSortedPoints(adjacentDiff(tmp))
787  stack.update(color, tmp)
788 
789  for stk in reversed(stack.data):
790  color = stk[0]
791 
792  # Now arrange list in a manner that it can be grouped by the height of the block
793  height = 0
794  xs = []
795  for p1,p2 in zip(stk[1], stk[1][1:]):
796  height += p1.y
797  xs.append((p1.x, p2.x-p1.x, height))
798  xs.sort(key = itemgetter(2))
799  xs = mergeContiguousBlocks(xs)
800 
801  for height, xpairs in groupby(xs, itemgetter(2)):
802  finalxs = [(e[0]/1000.,e[1]/1000.) for e in xpairs]
803  # plot the stacked plot, one color and one height on each call to broken_barh
804  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
805 
806  axStack.set_xlabel("Time (sec)");
807  axStack.set_ylabel("# modules");
808  axStack.set_xlim(ax.get_xlim())
809  axStack.tick_params(top='off')
810 
811  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
812  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
813  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
814  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
815  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
816  if displayExternalWork:
817  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
818  print("> ... Saving to file: '{}'".format(pdfFile))
819  plt.savefig(pdfFile)
820 
821 #=======================================
822 if __name__=="__main__":
823  import argparse
824  import re
825  import sys
826 
827  # Program options
828  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
829  formatter_class=argparse.RawDescriptionHelpFormatter,
830  epilog=printHelp())
831  parser.add_argument('filename',
832  type=argparse.FileType('r'), # open file
833  help='file to process')
834  parser.add_argument('-g', '--graph',
835  nargs='?',
836  metavar="'stall.pdf'",
837  const='stall.pdf',
838  dest='graph',
839  help='''Create pdf file of stream stall graph. If -g is specified
840  by itself, the default file name is \'stall.pdf\'. Otherwise, the
841  argument to the -g option is the filename.''')
842  parser.add_argument('-s', '--stack',
843  action='store_true',
844  help='''Create stack plot, combining all stream-specific info.
845  Can be used only when -g is specified.''')
846  parser.add_argument('-e', '--external',
847  action='store_false',
848  help='''Suppress display of external work in graphs.''')
849  parser.add_argument('-o', '--order',
850  action='store_true',
851  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
852  parser.add_argument('-t', '--timings',
853  action='store_true',
854  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
855  args = parser.parse_args()
856 
857  # Process parsed options
858  inputFile = args.filename
859  pdfFile = args.graph
860  shownStacks = args.stack
861  displayExternalWork = args.external
862  checkOrder = args.order
863  doModuleTimings = False
864  if args.timings:
865  doModuleTimings = True
866 
867  doGraphic = False
868  if pdfFile is not None:
869  doGraphic = True
870  import matplotlib
871  # Need to force display since problems with CMSSW matplotlib.
872  matplotlib.use("PDF")
873  import matplotlib.pyplot as plt
874  if not re.match(r'^[\w\.]+$', pdfFile):
875  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
876  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
877  exit(1)
878 
879  if '.' in pdfFile:
880  extension = pdfFile.split('.')[-1]
881  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
882  if not extension in supported_filetypes:
883  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
884  print("The allowed extensions are:")
885  for filetype in supported_filetypes:
886  print(" '.{}'".format(filetype))
887  exit(1)
888 
889  if pdfFile is None and shownStacks:
890  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
891  exit(1)
892 
893  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
894  reader = readLogFile(inputFile)
895  if kTracerInput:
896  checkOrder = True
897  sys.stderr.write(">processing data\n")
898  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
899 
900 
901  if not doGraphic:
902  sys.stderr.write(">preparing ASCII art\n")
903  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
904  else:
905  sys.stderr.write(">creating PDF\n")
906  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
907  printStalledModulesInOrder(stalledModules)
908  if doModuleTimings:
909  sys.stderr.write(">creating module-timings.json\n")
910  createModuleTiming(reader.processingSteps(), reader.numStreams)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def createModuleTiming(processingSteps, numStreams)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def processingStepsFromStallMonitorOutput(f, moduleNames)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
#define str(s)
double split
Definition: MVATrainer.cc:139
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run