CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from itertools import groupby
4 from operator import attrgetter,itemgetter
5 import sys
6 from collections import defaultdict
7 import six
8 
9 #----------------------------------------------
10 def printHelp():
11  s = '''
12 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
13  stream stalls. Use something like this in the configuration:
14 
15  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
16 
17  After running the job, execute this script and pass the name of the
18  StallMonitor log file to the script.
19 
20  By default, the script will then print an 'ASCII art' stall graph
21  which consists of a line of text for each time a module or the
22  source stops or starts. Each line contains the name of the module
23  which either started or stopped running, and the number of modules
24  running on each stream at that moment in time. After that will be
25  the time and stream number. Then if a module just started, you
26  will also see the amount of time the module spent between finishing
27  its prefetching and starting. The state of a module is represented
28  by a symbol:
29 
30  plus ("+") the stream has just finished waiting and is starting a module
31  minus ("-") the stream just finished running a module
32 
33  If a module had to wait more than 0.1 seconds, the end of the line
34  will have "STALLED". Startup actions, e.g. reading conditions,
35  may affect results for the first few events.
36 
37  Using the command line arguments described above you can make the
38  program create a PDF file with actual graphs instead of the 'ASCII art'
39  output.
40 
41  Once the graph is completed, the program outputs the list of modules
42  which had the greatest total stall times. The list is sorted by
43  total stall time and written in descending order. In addition, the
44  list of all stall times for the module is given.
45 
46  There is an inferior alternative (an old obsolete way).
47  Instead of using the StallMonitor Service, you can use the
48  Tracer Service. Make sure to use the 'printTimestamps' option
49  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
50  There are problems associated with this and it is not recommended.'''
51  return s
52 
53 kStallThreshold=100000 #in microseconds
54 kTracerInput=False
55 
56 #Stream states
57 kStarted=0
58 kFinished=1
59 kPrefetchEnd=2
60 kStartedAcquire=3
61 kFinishedAcquire=4
62 kStartedSource=5
63 kFinishedSource=6
64 kStartedSourceDelayedRead=7
65 kFinishedSourceDelayedRead=8
66 
67 #Special names
68 kSourceFindEvent = "sourceFindEvent"
69 kSourceDelayedRead ="sourceDelayedRead"
70 
71 #----------------------------------------------
73  for rawl in f:
74  l = rawl.strip()
75  if not l or l[0] == '#':
76  continue
77  (step,payload) = tuple(l.split(None,1))
78  payload=payload.split()
79 
80  # Ignore these
81  if step == 'E' or step == 'e':
82  continue
83 
84  # Payload format is:
85  # <stream id> <..other fields..> <time since begin job>
86  stream = int(payload[0])
87  time = int(payload[-1])
88  trans = None
89  isEvent = True
90 
91  name = None
92  # 'S' = begin of event creation in source
93  # 's' = end of event creation in source
94  if step == 'S' or step == 's':
95  name = kSourceFindEvent
96  trans = kStartedSource
97  # The start of an event is the end of the framework part
98  if step == 's':
99  trans = kFinishedSource
100  else:
101  # moduleID is the second payload argument for all steps below
102  moduleID = payload[1]
103 
104  # 'p' = end of module prefetching
105  # 'M' = begin of module processing
106  # 'm' = end of module processing
107  if step == 'p' or step == 'M' or step == 'm':
108  trans = kStarted
109  if step == 'p':
110  trans = kPrefetchEnd
111  elif step == 'm':
112  trans = kFinished
113  if step == 'm' or step == 'M':
114  isEvent = (int(payload[2]) == 0)
115  name = moduleNames[moduleID]
116 
117  # 'A' = begin of module acquire function
118  # 'a' = end of module acquire function
119  elif step == 'A' or step == 'a':
120  trans = kStartedAcquire
121  if step == 'a':
122  trans = kFinishedAcquire
123  name = moduleNames[moduleID]
124 
125  # Delayed read from source
126  # 'R' = begin of delayed read from source
127  # 'r' = end of delayed read from source
128  elif step == 'R' or step == 'r':
129  trans = kStartedSourceDelayedRead
130  if step == 'r':
131  trans = kFinishedSourceDelayedRead
132  name = kSourceDelayedRead
133 
134  if trans is not None:
135  yield (name,trans,stream,time, isEvent)
136 
137  return
138 
140  def __init__(self,f):
141  numStreams = 0
142  numStreamsFromSource = 0
143  moduleNames = {}
144  for rawl in f:
145  l = rawl.strip()
146  if l and l[0] == 'M':
147  i = l.split(' ')
148  if i[3] == '4':
149  #found global begin run
150  numStreams = int(i[1])+1
151  break
152  if numStreams == 0 and l and l[0] == 'S':
153  s = int(l.split(' ')[1])
154  if s > numStreamsFromSource:
155  numStreamsFromSource = s
156  if len(l) > 5 and l[0:2] == "#M":
157  (id,name)=tuple(l[2:].split())
158  moduleNames[id] = name
159  continue
160  self._f = f
161  if numStreams == 0:
162  numStreams = numStreamsFromSource +1
163  self.numStreams =numStreams
164  self._moduleNames = moduleNames
165  self.maxNameSize =0
166  for n in six.iteritems(moduleNames):
167  self.maxNameSize = max(self.maxNameSize,len(n))
168  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
169 
170  def processingSteps(self):
171  """Create a generator which can step through the file and return each processing step.
172  Using a generator reduces the memory overhead when parsing a large file.
173  """
174  self._f.seek(0)
176 
177 #----------------------------------------------
178 # Utility to get time out of Tracer output text format
179 def getTime(line):
180  time = line.split(" ")[1]
181  time = time.split(":")
182  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
183  time = int(1000000*time) # convert to microseconds
184  return time
185 
186 #----------------------------------------------
187 # The next function parses the Tracer output.
188 # Here are some differences to consider if you use Tracer output
189 # instead of the StallMonitor output.
190 # - The time in the text of the Tracer output is not as precise
191 # as the StallMonitor (.01 s vs .001 s)
192 # - The MessageLogger bases the time on when the message printed
193 # and not when it was initially queued up to print which smears
194 # the accuracy of the times.
195 # - Both of the previous things can produce some strange effects
196 # in the output plots.
197 # - The file size of the Tracer text file is much larger.
198 # - The CPU work needed to parse the Tracer files is larger.
199 # - The Tracer log file is expected to have "++" in the first
200 # or fifth line. If there are extraneous lines at the beginning
201 # you have to remove them.
202 # - The ascii printout out will have one extraneous line
203 # near the end for the SourceFindEvent start.
204 # - The only advantage I can see is that you have only
205 # one output file to handle instead of two, the regular
206 # log file and the StallMonitor output.
207 # We might should just delete the Tracer option because it is
208 # clearly inferior ...
210  processingSteps = []
211  numStreams = 0
212  maxNameSize = 0
213  startTime = 0
214  streamsThatSawFirstEvent = set()
215  for l in f:
216  trans = None
217  # We estimate the start and stop of the source
218  # by the end of the previous event and start of
219  # the event. This is historical, probably because
220  # the Tracer output for the begin and end of the
221  # source event does not include the stream number.
222  if l.find("processing event :") != -1:
223  name = kSourceFindEvent
224  trans = kStartedSource
225  # the end of the source is estimated using the start of the event
226  if l.find("starting:") != -1:
227  trans = kFinishedSource
228  elif l.find("processing event for module") != -1:
229  trans = kStarted
230  if l.find("finished:") != -1:
231  if l.find("prefetching") != -1:
232  trans = kPrefetchEnd
233  else:
234  trans = kFinished
235  else:
236  if l.find("prefetching") != -1:
237  #skip this since we don't care about prefetch starts
238  continue
239  name = l.split("'")[1]
240  elif l.find("processing event acquire for module:") != -1:
241  trans = kStartedAcquire
242  if l.find("finished:") != -1:
243  trans = kFinishedAcquire
244  name = l.split("'")[1]
245  elif l.find("event delayed read from source") != -1:
246  trans = kStartedSourceDelayedRead
247  if l.find("finished:") != -1:
248  trans = kFinishedSourceDelayedRead
249  name = kSourceDelayedRead
250  if trans is not None:
251  time = getTime(l)
252  if startTime == 0:
253  startTime = time
254  time = time - startTime
255  streamIndex = l.find("stream = ")
256  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
257  maxNameSize = max(maxNameSize, len(name))
258 
259  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
260  # This is wrong but there is no way to estimate the time better
261  # because there is no previous event for the first event.
262  processingSteps.append((name,kStartedSource,stream,time,True))
263  streamsThatSawFirstEvent.add(stream)
264 
265  processingSteps.append((name,trans,stream,time, True))
266  numStreams = max(numStreams, stream+1)
267 
268  f.close()
269  return (processingSteps,numStreams,maxNameSize)
270 
272  def __init__(self,f):
273  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
274  def processingSteps(self):
275  return self._processingSteps
276 
277 #----------------------------------------------
278 def chooseParser(inputFile):
279 
280  firstLine = inputFile.readline().rstrip()
281  for i in range(3):
282  inputFile.readline()
283  # Often the Tracer log file starts with 4 lines not from the Tracer
284  fifthLine = inputFile.readline().rstrip()
285  inputFile.seek(0) # Rewind back to beginning
286  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
287  print("> ... Parsing StallMonitor output.")
288  return StallMonitorParser
289 
290  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
291  global kTracerInput
292  kTracerInput = True
293  print("> ... Parsing Tracer output.")
294  return TracerParser
295  else:
296  inputFile.close()
297  print("Unknown input format.")
298  exit(1)
299 
300 #----------------------------------------------
301 def readLogFile(inputFile):
302  parseInput = chooseParser(inputFile)
303  return parseInput(inputFile)
304 
305 #----------------------------------------------
306 #
307 # modules: The time between prefetch finished and 'start processing' is
308 # the time it took to acquire any resources which is by definition the
309 # stall time.
310 #
311 # source: The source just records how long it spent doing work,
312 # not how long it was stalled. We can get a lower bound on the stall
313 # time for delayed reads by measuring the time the stream was doing
314 # no work up till the start of the source delayed read.
315 #
316 def findStalledModules(processingSteps, numStreams):
317  streamTime = [0]*numStreams
318  streamState = [0]*numStreams
319  stalledModules = {}
320  modulesActiveOnStream = [{} for x in xrange(numStreams)]
321  for n,trans,s,time,isEvent in processingSteps:
322 
323  waitTime = None
324  modulesOnStream = modulesActiveOnStream[s]
325  if trans == kPrefetchEnd:
326  modulesOnStream[n] = time
327  elif trans == kStarted or trans == kStartedAcquire:
328  if n in modulesOnStream:
329  waitTime = time - modulesOnStream[n]
330  modulesOnStream.pop(n, None)
331  streamState[s] +=1
332  elif trans == kFinished or trans == kFinishedAcquire:
333  streamState[s] -=1
334  streamTime[s] = time
335  elif trans == kStartedSourceDelayedRead:
336  if streamState[s] == 0:
337  waitTime = time - streamTime[s]
338  elif trans == kStartedSource:
339  modulesOnStream.clear()
340  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
341  streamTime[s] = time
342  if waitTime is not None:
343  if waitTime > kStallThreshold:
344  t = stalledModules.setdefault(n,[])
345  t.append(waitTime)
346  return stalledModules
347 
348 
349 def createModuleTiming(processingSteps, numStreams):
350  import json
351  streamTime = [0]*numStreams
352  streamState = [0]*numStreams
353  moduleTimings = defaultdict(list)
354  modulesActiveOnStream = [defaultdict(int) for x in xrange(numStreams)]
355  for n,trans,s,time,isEvent in processingSteps:
356  waitTime = None
357  modulesOnStream = modulesActiveOnStream[s]
358  if isEvent:
359  if trans == kStarted:
360  streamState[s] = 1
361  modulesOnStream[n]=time
362  elif trans == kFinished:
363  waitTime = time - modulesOnStream[n]
364  modulesOnStream.pop(n, None)
365  streamState[s] = 0
366  moduleTimings[n].append(float(waitTime/1000.))
367 
368  with open('module-timings.json', 'w') as outfile:
369  outfile.write(json.dumps(moduleTimings, indent=4))
370 
371 #----------------------------------------------
372 def createAsciiImage(processingSteps, numStreams, maxNameSize):
373  streamTime = [0]*numStreams
374  streamState = [0]*numStreams
375  modulesActiveOnStreams = [{} for x in xrange(numStreams)]
376  for n,trans,s,time,isEvent in processingSteps:
377  waitTime = None
378  modulesActiveOnStream = modulesActiveOnStreams[s]
379  if trans == kPrefetchEnd:
380  modulesActiveOnStream[n] = time
381  continue
382  elif trans == kStartedAcquire or trans == kStarted:
383  if n in modulesActiveOnStream:
384  waitTime = time - modulesActiveOnStream[n]
385  modulesActiveOnStream.pop(n, None)
386  streamState[s] +=1
387  elif trans == kFinishedAcquire or trans == kFinished:
388  streamState[s] -=1
389  streamTime[s] = time
390  elif trans == kStartedSourceDelayedRead:
391  if streamState[s] == 0:
392  waitTime = time - streamTime[s]
393  elif trans == kStartedSource:
394  modulesActiveOnStream.clear()
395  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
396  streamTime[s] = time
397  states = "%-*s: " % (maxNameSize,n)
398  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
399  states +="+ "
400  else:
401  states +="- "
402  for index, state in enumerate(streamState):
403  if n==kSourceFindEvent and index == s:
404  states +="* "
405  else:
406  states +=str(state)+" "
407  states += " -- " + str(time/1000.) + " " + str(s) + " "
408  if waitTime is not None:
409  states += " %.2f"% (waitTime/1000.)
410  if waitTime > kStallThreshold:
411  states += " STALLED"
412 
413  print(states)
414 
415 #----------------------------------------------
416 def printStalledModulesInOrder(stalledModules):
417  priorities = []
418  maxNameSize = 0
419  for name,t in six.iteritems(stalledModules):
420  maxNameSize = max(maxNameSize, len(name))
421  t.sort(reverse=True)
422  priorities.append((name,sum(t),t))
423 
424  def sumSort(i,j):
425  return cmp(i[1],j[1])
426  priorities.sort(cmp=sumSort, reverse=True)
427 
428  nameColumn = "Stalled Module"
429  maxNameSize = max(maxNameSize, len(nameColumn))
430 
431  stallColumn = "Tot Stall Time"
432  stallColumnLength = len(stallColumn)
433 
434  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
435  for n,s,t in priorities:
436  paddedName = "%-*s:" % (maxNameSize,n)
437  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
438 
439 #--------------------------------------------------------
440 class Point:
441  def __init__(self, x_, y_):
442  self.x = x_
443  self.y = y_
444 
445  def __str__(self):
446  return "(x: {}, y: {})".format(self.x,self.y)
447 
448  def __repr__(self):
449  return self.__str__()
450 
451 #--------------------------------------------------------
453  if len(ps) < 2:
454  return ps
455  reducedPoints = []
456  tmp = Point(ps[0].x, ps[0].y)
457  for p in ps[1:]:
458  if tmp.x == p.x:
459  tmp.y += p.y
460  else:
461  reducedPoints.append(tmp)
462  tmp = Point(p.x, p.y)
463  reducedPoints.append(tmp)
464  reducedPoints = [p for p in reducedPoints if p.y != 0]
465  return reducedPoints
466 
467 # -------------------------------------------
468 def adjacentDiff(*pairLists):
469  points = []
470  for pairList in pairLists:
471  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
472  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
473  points.sort(key=attrgetter('x'))
474  return points
475 
476 stackType = 'stack'
477 
478 # --------------------------------------------
479 class Stack:
480  def __init__(self):
481  self.data = []
482 
483  def update(self, graphType, points):
484  tmp = points
485  if len(self.data) != 0:
486  tmp += self.data[-1][1]
487 
488  tmp.sort(key=attrgetter('x'))
489  tmp = reduceSortedPoints(tmp)
490  self.data.append((graphType, tmp))
491 
492 #---------------------------------------------
493 # StreamInfoElement
495  def __init__(self, begin_, delta_, color_):
496  self.begin=begin_
497  self.delta=delta_
498  self.color=color_
499 
500  def unpack(self):
501  return self.begin, self.delta, self.color
502 
503 #----------------------------------------------
504 # Consolidating contiguous blocks with the same color
505 # drastically reduces the size of the pdf file.
506 def consolidateContiguousBlocks(numStreams, streamInfo):
507  oldStreamInfo = streamInfo
508  streamInfo = [[] for x in xrange(numStreams)]
509 
510  for s in xrange(numStreams):
511  if oldStreamInfo[s]:
512  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
513  for info in oldStreamInfo[s][1:]:
514  start,length,color = info.unpack()
515  if color == lastColor and lastStartTime+lastTimeLength == start:
516  lastTimeLength += length
517  else:
518  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
519  lastStartTime = start
520  lastTimeLength = length
521  lastColor = color
522  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
523 
524  return streamInfo
525 
526 #----------------------------------------------
527 # Consolidating contiguous blocks with the same color drastically
528 # reduces the size of the pdf file. Same functionality as the
529 # previous function, but with slightly different implementation.
531  oldBlocks = blocks
532 
533  blocks = []
534  if not oldBlocks:
535  return blocks
536 
537  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
538  for start,length,height in oldBlocks[1:]:
539  if height == lastHeight and lastStartTime+lastTimeLength == start:
540  lastTimeLength += length
541  else:
542  blocks.append((lastStartTime,lastTimeLength,lastHeight))
543  lastStartTime = start
544  lastTimeLength = length
545  lastHeight = height
546  blocks.append((lastStartTime,lastTimeLength,lastHeight))
547 
548  return blocks
549 
550 #----------------------------------------------
551 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
552  points = sorted(points, key=attrgetter('x'))
553  points = reduceSortedPoints(points)
554  streamHeight = 0
555  preparedTimes = []
556  for t1,t2 in zip(points, points[1:]):
557  streamHeight += t1.y
558  # We make a cut here when plotting because the first row for
559  # each stream was already plotted previously and we do not
560  # need to plot it again. And also we want to count things
561  # properly in allStackTimes. We want to avoid double counting
562  # or missing running modules and this is complicated because
563  # we counted the modules in the first row already.
564  if streamHeight < streamHeightCut:
565  continue
566  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
567  preparedTimes.sort(key=itemgetter(2))
568  preparedTimes = mergeContiguousBlocks(preparedTimes)
569 
570  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
571  theTS = [(t[0],t[1]) for t in ts]
572  if doPlot:
573  theTimes = [(t[0]/1000.,t[1]/1000.) for t in theTS]
574  yspan = (stream-0.4+height,height*(nthreads-1))
575  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
576  if addToStackTimes:
577  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
578 
579 #----------------------------------------------
580 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
581 
582  stalledModuleNames = set([x for x in stalledModuleInfo.iterkeys()])
583  streamLowestRow = [[] for x in xrange(numStreams)]
584  modulesActiveOnStreams = [set() for x in xrange(numStreams)]
585  acquireActiveOnStreams = [set() for x in xrange(numStreams)]
586  externalWorkOnStreams = [set() for x in xrange(numStreams)]
587  previousFinishTime = [None for x in xrange(numStreams)]
588  streamRunningTimes = [[] for x in xrange(numStreams)]
589  streamExternalWorkRunningTimes = [[] for x in xrange(numStreams)]
590  maxNumberOfConcurrentModulesOnAStream = 1
591  externalWorkModulesInJob = False
592  previousTime = [0 for x in xrange(numStreams)]
593 
594  # The next five variables are only used to check for out of order transitions
595  finishBeforeStart = [set() for x in xrange(numStreams)]
596  finishAcquireBeforeStart = [set() for x in xrange(numStreams)]
597  countSource = [0 for x in xrange(numStreams)]
598  countDelayedSource = [0 for x in xrange(numStreams)]
599  countExternalWork = [defaultdict(int) for x in xrange(numStreams)]
600 
601  timeOffset = None
602  for n,trans,s,time,isEvent in processingSteps:
603  if timeOffset is None:
604  timeOffset = time
605  startTime = None
606  time -=timeOffset
607  # force the time to monotonically increase on each stream
608  if time < previousTime[s]:
609  time = previousTime[s]
610  previousTime[s] = time
611 
612  activeModules = modulesActiveOnStreams[s]
613  acquireModules = acquireActiveOnStreams[s]
614  externalWorkModules = externalWorkOnStreams[s]
615 
616  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
617  if checkOrder:
618  # Note that the code which checks the order of transitions assumes that
619  # all the transitions exist in the input. It is checking only for order
620  # problems, usually a start before a finish. Problems are fixed and
621  # silently ignored. Nothing gets plotted for transitions that are
622  # in the wrong order.
623  if trans == kStarted:
624  countExternalWork[s][n] -= 1
625  if n in finishBeforeStart[s]:
626  finishBeforeStart[s].remove(n)
627  continue
628  elif trans == kStartedAcquire:
629  if n in finishAcquireBeforeStart[s]:
630  finishAcquireBeforeStart[s].remove(n)
631  continue
632 
633  if trans == kStartedSourceDelayedRead:
634  countDelayedSource[s] += 1
635  if countDelayedSource[s] < 1:
636  continue
637  elif trans == kStartedSource:
638  countSource[s] += 1
639  if countSource[s] < 1:
640  continue
641 
642  moduleNames = activeModules.copy()
643  moduleNames.update(acquireModules)
644  if trans == kStartedAcquire:
645  acquireModules.add(n)
646  else:
647  activeModules.add(n)
648  streamRunningTimes[s].append(Point(time,1))
649  if moduleNames or externalWorkModules:
650  startTime = previousFinishTime[s]
651  previousFinishTime[s] = time
652 
653  if trans == kStarted and n in externalWorkModules:
654  externalWorkModules.remove(n)
655  streamExternalWorkRunningTimes[s].append(Point(time, -1))
656  else:
657  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
658  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
659  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
660  if checkOrder:
661  if trans == kFinished:
662  if n not in activeModules:
663  finishBeforeStart[s].add(n)
664  continue
665 
666  if trans == kFinishedSourceDelayedRead:
667  countDelayedSource[s] -= 1
668  if countDelayedSource[s] < 0:
669  continue
670  elif trans == kFinishedSource:
671  countSource[s] -= 1
672  if countSource[s] < 0:
673  continue
674 
675  if trans == kFinishedAcquire:
676  if checkOrder:
677  countExternalWork[s][n] += 1
678  if displayExternalWork:
679  externalWorkModulesInJob = True
680  if (not checkOrder) or countExternalWork[s][n] > 0:
681  externalWorkModules.add(n)
682  streamExternalWorkRunningTimes[s].append(Point(time,+1))
683  if checkOrder and n not in acquireModules:
684  finishAcquireBeforeStart[s].add(n)
685  continue
686  streamRunningTimes[s].append(Point(time,-1))
687  startTime = previousFinishTime[s]
688  previousFinishTime[s] = time
689  moduleNames = activeModules.copy()
690  moduleNames.update(acquireModules)
691 
692  if trans == kFinishedAcquire:
693  acquireModules.remove(n)
694  elif trans == kFinishedSourceDelayedRead:
695  if countDelayedSource[s] == 0:
696  activeModules.remove(n)
697  elif trans == kFinishedSource:
698  if countSource[s] == 0:
699  activeModules.remove(n)
700  else:
701  activeModules.remove(n)
702 
703  if startTime is not None:
704  c="green"
705  if not isEvent:
706  c="limegreen"
707  if not moduleNames:
708  c = "darkviolet"
709  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
710  c = "orange"
711  else:
712  for n in moduleNames:
713  if n in stalledModuleNames:
714  c="red"
715  break
716  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
717  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
718 
719  nr = 1
720  if shownStacks:
721  nr += 1
722  fig, ax = plt.subplots(nrows=nr, squeeze=True)
723  axStack = None
724  if shownStacks:
725  [xH,yH] = fig.get_size_inches()
726  fig.set_size_inches(xH,yH*4/3)
727  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
728  axStack = plt.subplot2grid((4,1),(3,0))
729 
730  ax.set_xlabel("Time (sec)")
731  ax.set_ylabel("Stream ID")
732  ax.set_ylim(-0.5,numStreams-0.5)
733  ax.yaxis.set_ticks(xrange(numStreams))
734 
735  height = 0.8/maxNumberOfConcurrentModulesOnAStream
736  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
737  for iStream,lowestRow in enumerate(streamLowestRow):
738  times=[(x.begin/1000., x.delta/1000.) for x in lowestRow] # Scale from msec to sec.
739  colors=[x.color for x in lowestRow]
740  # for each stream, plot the lowest row
741  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
742  # record them also for inclusion in the stack plot
743  # the darkviolet ones get counted later so do not count them here
744  for info in lowestRow:
745  if not info.color == 'darkviolet':
746  allStackTimes[info.color].append((info.begin, info.delta))
747 
748  # Now superimpose the number of concurrently running modules on to the graph.
749  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
750 
751  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
752 
753  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
754  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
755 
756  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
757  allStackTimes, ax, i, height,
758  streamHeightCut=2,
759  doPlot=True,
760  addToStackTimes=False,
761  color='darkviolet',
762  threadOffset=1)
763 
764  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
765  allStackTimes, ax, i, height,
766  streamHeightCut=2,
767  doPlot=True,
768  addToStackTimes=True,
769  color='blue',
770  threadOffset=1)
771 
772  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
773  allStackTimes, ax, i, height,
774  streamHeightCut=1,
775  doPlot=False,
776  addToStackTimes=True,
777  color='darkviolet',
778  threadOffset=0)
779 
780  if shownStacks:
781  print("> ... Generating stack")
782  stack = Stack()
783  for color in ['green','limegreen','blue','red','orange','darkviolet']:
784  tmp = allStackTimes[color]
785  tmp = reduceSortedPoints(adjacentDiff(tmp))
786  stack.update(color, tmp)
787 
788  for stk in reversed(stack.data):
789  color = stk[0]
790 
791  # Now arrange list in a manner that it can be grouped by the height of the block
792  height = 0
793  xs = []
794  for p1,p2 in zip(stk[1], stk[1][1:]):
795  height += p1.y
796  xs.append((p1.x, p2.x-p1.x, height))
797  xs.sort(key = itemgetter(2))
798  xs = mergeContiguousBlocks(xs)
799 
800  for height, xpairs in groupby(xs, itemgetter(2)):
801  finalxs = [(e[0]/1000.,e[1]/1000.) for e in xpairs]
802  # plot the stacked plot, one color and one height on each call to broken_barh
803  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
804 
805  axStack.set_xlabel("Time (sec)");
806  axStack.set_ylabel("# modules");
807  axStack.set_xlim(ax.get_xlim())
808  axStack.tick_params(top='off')
809 
810  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
811  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
812  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
813  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
814  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
815  if displayExternalWork:
816  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
817  print("> ... Saving to file: '{}'".format(pdfFile))
818  plt.savefig(pdfFile)
819 
820 #=======================================
821 if __name__=="__main__":
822  import argparse
823  import re
824  import sys
825 
826  # Program options
827  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
828  formatter_class=argparse.RawDescriptionHelpFormatter,
829  epilog=printHelp())
830  parser.add_argument('filename',
831  type=argparse.FileType('r'), # open file
832  help='file to process')
833  parser.add_argument('-g', '--graph',
834  nargs='?',
835  metavar="'stall.pdf'",
836  const='stall.pdf',
837  dest='graph',
838  help='''Create pdf file of stream stall graph. If -g is specified
839  by itself, the default file name is \'stall.pdf\'. Otherwise, the
840  argument to the -g option is the filename.''')
841  parser.add_argument('-s', '--stack',
842  action='store_true',
843  help='''Create stack plot, combining all stream-specific info.
844  Can be used only when -g is specified.''')
845  parser.add_argument('-e', '--external',
846  action='store_false',
847  help='''Suppress display of external work in graphs.''')
848  parser.add_argument('-o', '--order',
849  action='store_true',
850  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
851  parser.add_argument('-t', '--timings',
852  action='store_true',
853  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
854  args = parser.parse_args()
855 
856  # Process parsed options
857  inputFile = args.filename
858  pdfFile = args.graph
859  shownStacks = args.stack
860  displayExternalWork = args.external
861  checkOrder = args.order
862  doModuleTimings = False
863  if args.timings:
864  doModuleTimings = True
865 
866  doGraphic = False
867  if pdfFile is not None:
868  doGraphic = True
869  import matplotlib
870  # Need to force display since problems with CMSSW matplotlib.
871  matplotlib.use("PDF")
872  import matplotlib.pyplot as plt
873  if not re.match(r'^[\w\.]+$', pdfFile):
874  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
875  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
876  exit(1)
877 
878  if '.' in pdfFile:
879  extension = pdfFile.split('.')[-1]
880  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
881  if not extension in supported_filetypes:
882  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
883  print("The allowed extensions are:")
884  for filetype in supported_filetypes:
885  print(" '.{}'".format(filetype))
886  exit(1)
887 
888  if pdfFile is None and shownStacks:
889  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
890  exit(1)
891 
892  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
893  reader = readLogFile(inputFile)
894  if kTracerInput:
895  checkOrder = True
896  sys.stderr.write(">processing data\n")
897  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
898 
899 
900  if not doGraphic:
901  sys.stderr.write(">preparing ASCII art\n")
902  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
903  else:
904  sys.stderr.write(">creating PDF\n")
905  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
906  printStalledModulesInOrder(stalledModules)
907  if doModuleTimings:
908  sys.stderr.write(">creating module-timings.json\n")
909  createModuleTiming(reader.processingSteps(), reader.numStreams)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def createModuleTiming(processingSteps, numStreams)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def processingStepsFromStallMonitorOutput(f, moduleNames)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
#define str(s)
double split
Definition: MVATrainer.cc:139
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run