CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from itertools import groupby
4 from operator import attrgetter,itemgetter
5 import sys
6 from collections import defaultdict
7 import six
8 
9 #----------------------------------------------
10 def printHelp():
11  s = '''
12 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
13  stream stalls. Use something like this in the configuration:
14 
15  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
16 
17  After running the job, execute this script and pass the name of the
18  StallMonitor log file to the script.
19 
20  By default, the script will then print an 'ASCII art' stall graph
21  which consists of a line of text for each time a module or the
22  source stops or starts. Each line contains the name of the module
23  which either started or stopped running, and the number of modules
24  running on each stream at that moment in time. After that will be
25  the time and stream number. Then if a module just started, you
26  will also see the amount of time the module spent between finishing
27  its prefetching and starting. The state of a module is represented
28  by a symbol:
29 
30  plus ("+") the stream has just finished waiting and is starting a module
31  minus ("-") the stream just finished running a module
32 
33  If a module had to wait more than 0.1 seconds, the end of the line
34  will have "STALLED". Startup actions, e.g. reading conditions,
35  may affect results for the first few events.
36 
37  Using the command line arguments described above you can make the
38  program create a PDF file with actual graphs instead of the 'ASCII art'
39  output.
40 
41  Once the graph is completed, the program outputs the list of modules
42  which had the greatest total stall times. The list is sorted by
43  total stall time and written in descending order. In addition, the
44  list of all stall times for the module is given.
45 
46  There is an inferior alternative (an old obsolete way).
47  Instead of using the StallMonitor Service, you can use the
48  Tracer Service. Make sure to use the 'printTimestamps' option
49  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
50  There are problems associated with this and it is not recommended.'''
51  return s
52 
53 kStallThreshold=100 #in milliseconds
54 kTracerInput=False
55 
56 #Stream states
57 kStarted=0
58 kFinished=1
59 kPrefetchEnd=2
60 kStartedAcquire=3
61 kFinishedAcquire=4
62 kStartedSource=5
63 kFinishedSource=6
64 kStartedSourceDelayedRead=7
65 kFinishedSourceDelayedRead=8
66 
67 #Special names
68 kSourceFindEvent = "sourceFindEvent"
69 kSourceDelayedRead ="sourceDelayedRead"
70 
71 #----------------------------------------------
73  for rawl in f:
74  l = rawl.strip()
75  if not l or l[0] == '#':
76  continue
77  (step,payload) = tuple(l.split(None,1))
78  payload=payload.split()
79 
80  # Ignore these
81  if step == 'E' or step == 'e':
82  continue
83 
84  # Payload format is:
85  # <stream id> <..other fields..> <time since begin job>
86  stream = int(payload[0])
87  time = int(payload[-1])
88  trans = None
89  isEvent = True
90 
91  name = None
92  # 'S' = begin of event creation in source
93  # 's' = end of event creation in source
94  if step == 'S' or step == 's':
95  name = kSourceFindEvent
96  trans = kStartedSource
97  # The start of an event is the end of the framework part
98  if step == 's':
99  trans = kFinishedSource
100  else:
101  # moduleID is the second payload argument for all steps below
102  moduleID = payload[1]
103 
104  # 'p' = end of module prefetching
105  # 'M' = begin of module processing
106  # 'm' = end of module processing
107  if step == 'p' or step == 'M' or step == 'm':
108  trans = kStarted
109  if step == 'p':
110  trans = kPrefetchEnd
111  elif step == 'm':
112  trans = kFinished
113  if step == 'm' or step == 'M':
114  isEvent = (int(payload[2]) == 0)
115  name = moduleNames[moduleID]
116 
117  # 'A' = begin of module acquire function
118  # 'a' = end of module acquire function
119  elif step == 'A' or step == 'a':
120  trans = kStartedAcquire
121  if step == 'a':
122  trans = kFinishedAcquire
123  name = moduleNames[moduleID]
124 
125  # Delayed read from source
126  # 'R' = begin of delayed read from source
127  # 'r' = end of delayed read from source
128  elif step == 'R' or step == 'r':
129  trans = kStartedSourceDelayedRead
130  if step == 'r':
131  trans = kFinishedSourceDelayedRead
132  name = kSourceDelayedRead
133 
134  if trans is not None:
135  yield (name,trans,stream,time, isEvent)
136 
137  return
138 
140  def __init__(self,f):
141  numStreams = 0
142  numStreamsFromSource = 0
143  moduleNames = {}
144  for rawl in f:
145  l = rawl.strip()
146  if l and l[0] == 'M':
147  i = l.split(' ')
148  if i[3] == '4':
149  #found global begin run
150  numStreams = int(i[1])+1
151  break
152  if numStreams == 0 and l and l[0] == 'S':
153  s = int(l.split(' ')[1])
154  if s > numStreamsFromSource:
155  numStreamsFromSource = s
156  if len(l) > 5 and l[0:2] == "#M":
157  (id,name)=tuple(l[2:].split())
158  moduleNames[id] = name
159  continue
160  self._f = f
161  if numStreams == 0:
162  numStreams = numStreamsFromSource +1
163  self.numStreams =numStreams
164  self._moduleNames = moduleNames
165  self.maxNameSize =0
166  for n in six.iteritems(moduleNames):
167  self.maxNameSize = max(self.maxNameSize,len(n))
168  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
169 
170  def processingSteps(self):
171  """Create a generator which can step through the file and return each processing step.
172  Using a generator reduces the memory overhead when parsing a large file.
173  """
174  self._f.seek(0)
176 
177 #----------------------------------------------
178 # Utility to get time out of Tracer output text format
179 def getTime(line):
180  time = line.split(" ")[1]
181  time = time.split(":")
182  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
183  time = int(1000*time) # convert to milliseconds
184  return time
185 
186 #----------------------------------------------
187 # The next function parses the Tracer output.
188 # Here are some differences to consider if you use Tracer output
189 # instead of the StallMonitor output.
190 # - The time in the text of the Tracer output is not as precise
191 # as the StallMonitor (.01 s vs .001 s)
192 # - The MessageLogger bases the time on when the message printed
193 # and not when it was initially queued up to print which smears
194 # the accuracy of the times.
195 # - Both of the previous things can produce some strange effects
196 # in the output plots.
197 # - The file size of the Tracer text file is much larger.
198 # - The CPU work needed to parse the Tracer files is larger.
199 # - The Tracer log file is expected to have "++" in the first
200 # or fifth line. If there are extraneous lines at the beginning
201 # you have to remove them.
202 # - The ascii printout out will have one extraneous line
203 # near the end for the SourceFindEvent start.
204 # - The only advantage I can see is that you have only
205 # one output file to handle instead of two, the regular
206 # log file and the StallMonitor output.
207 # We might should just delete the Tracer option because it is
208 # clearly inferior ...
210  processingSteps = []
211  numStreams = 0
212  maxNameSize = 0
213  startTime = 0
214  streamsThatSawFirstEvent = set()
215  for l in f:
216  trans = None
217  # We estimate the start and stop of the source
218  # by the end of the previous event and start of
219  # the event. This is historical, probably because
220  # the Tracer output for the begin and end of the
221  # source event does not include the stream number.
222  if l.find("processing event :") != -1:
223  name = kSourceFindEvent
224  trans = kStartedSource
225  # the end of the source is estimated using the start of the event
226  if l.find("starting:") != -1:
227  trans = kFinishedSource
228  elif l.find("processing event for module") != -1:
229  trans = kStarted
230  if l.find("finished:") != -1:
231  if l.find("prefetching") != -1:
232  trans = kPrefetchEnd
233  else:
234  trans = kFinished
235  else:
236  if l.find("prefetching") != -1:
237  #skip this since we don't care about prefetch starts
238  continue
239  name = l.split("'")[1]
240  elif l.find("processing event acquire for module:") != -1:
241  trans = kStartedAcquire
242  if l.find("finished:") != -1:
243  trans = kFinishedAcquire
244  name = l.split("'")[1]
245  elif l.find("event delayed read from source") != -1:
246  trans = kStartedSourceDelayedRead
247  if l.find("finished:") != -1:
248  trans = kFinishedSourceDelayedRead
249  name = kSourceDelayedRead
250  if trans is not None:
251  time = getTime(l)
252  if startTime == 0:
253  startTime = time
254  time = time - startTime
255  streamIndex = l.find("stream = ")
256  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
257  maxNameSize = max(maxNameSize, len(name))
258 
259  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
260  # This is wrong but there is no way to estimate the time better
261  # because there is no previous event for the first event.
262  processingSteps.append((name,kStartedSource,stream,time,True))
263  streamsThatSawFirstEvent.add(stream)
264 
265  processingSteps.append((name,trans,stream,time, True))
266  numStreams = max(numStreams, stream+1)
267 
268  f.close()
269  return (processingSteps,numStreams,maxNameSize)
270 
272  def __init__(self,f):
273  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
274  def processingSteps(self):
275  return self._processingSteps
276 
277 #----------------------------------------------
278 def chooseParser(inputFile):
279 
280  firstLine = inputFile.readline().rstrip()
281  for i in range(3):
282  inputFile.readline()
283  # Often the Tracer log file starts with 4 lines not from the Tracer
284  fifthLine = inputFile.readline().rstrip()
285  inputFile.seek(0) # Rewind back to beginning
286  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
287  print("> ... Parsing StallMonitor output.")
288  return StallMonitorParser
289 
290  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
291  global kTracerInput
292  kTracerInput = True
293  print("> ... Parsing Tracer output.")
294  return TracerParser
295  else:
296  inputFile.close()
297  print("Unknown input format.")
298  exit(1)
299 
300 #----------------------------------------------
301 def readLogFile(inputFile):
302  parseInput = chooseParser(inputFile)
303  return parseInput(inputFile)
304 
305 #----------------------------------------------
306 #
307 # modules: The time between prefetch finished and 'start processing' is
308 # the time it took to acquire any resources which is by definition the
309 # stall time.
310 #
311 # source: The source just records how long it spent doing work,
312 # not how long it was stalled. We can get a lower bound on the stall
313 # time for delayed reads by measuring the time the stream was doing
314 # no work up till the start of the source delayed read.
315 #
316 def findStalledModules(processingSteps, numStreams):
317  streamTime = [0]*numStreams
318  streamState = [0]*numStreams
319  stalledModules = {}
320  modulesActiveOnStream = [{} for x in xrange(numStreams)]
321  for n,trans,s,time,isEvent in processingSteps:
322 
323  waitTime = None
324  modulesOnStream = modulesActiveOnStream[s]
325  if trans == kPrefetchEnd:
326  modulesOnStream[n] = time
327  elif trans == kStarted or trans == kStartedAcquire:
328  if n in modulesOnStream:
329  waitTime = time - modulesOnStream[n]
330  modulesOnStream.pop(n, None)
331  streamState[s] +=1
332  elif trans == kFinished or trans == kFinishedAcquire:
333  streamState[s] -=1
334  streamTime[s] = time
335  elif trans == kStartedSourceDelayedRead:
336  if streamState[s] == 0:
337  waitTime = time - streamTime[s]
338  elif trans == kStartedSource:
339  modulesOnStream.clear()
340  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
341  streamTime[s] = time
342  if waitTime is not None:
343  if waitTime > kStallThreshold:
344  t = stalledModules.setdefault(n,[])
345  t.append(waitTime)
346  return stalledModules
347 
348 
349 def createModuleTiming(processingSteps, numStreams):
350  import yaml
351  streamTime = [0]*numStreams
352  streamState = [0]*numStreams
353  moduleTimings = defaultdict(list)
354  modulesActiveOnStream = [defaultdict(int) for x in xrange(numStreams)]
355  for n,trans,s,time,isEvent in processingSteps:
356  waitTime = None
357  modulesOnStream = modulesActiveOnStream[s]
358  if trans == kStarted:
359  streamState[s] = 1
360  modulesOnStream[n]=time
361  elif trans == kFinished:
362  waitTime = time - modulesOnStream[n]
363  modulesOnStream.pop(n, None)
364  streamState[s] = 0
365  moduleTimings[n].append(float(waitTime/1000.))
366  with open('module-timings.yaml', 'w') as outfile:
367  outfile.write(yaml.dump(moduleTimings, default_flow_style=True))
368 
369 #----------------------------------------------
370 def createAsciiImage(processingSteps, numStreams, maxNameSize):
371  streamTime = [0]*numStreams
372  streamState = [0]*numStreams
373  modulesActiveOnStreams = [{} for x in xrange(numStreams)]
374  for n,trans,s,time,isEvent in processingSteps:
375  waitTime = None
376  modulesActiveOnStream = modulesActiveOnStreams[s]
377  if trans == kPrefetchEnd:
378  modulesActiveOnStream[n] = time
379  continue
380  elif trans == kStartedAcquire or trans == kStarted:
381  if n in modulesActiveOnStream:
382  waitTime = time - modulesActiveOnStream[n]
383  modulesActiveOnStream.pop(n, None)
384  streamState[s] +=1
385  elif trans == kFinishedAcquire or trans == kFinished:
386  streamState[s] -=1
387  streamTime[s] = time
388  elif trans == kStartedSourceDelayedRead:
389  if streamState[s] == 0:
390  waitTime = time - streamTime[s]
391  elif trans == kStartedSource:
392  modulesActiveOnStream.clear()
393  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
394  streamTime[s] = time
395  states = "%-*s: " % (maxNameSize,n)
396  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
397  states +="+ "
398  else:
399  states +="- "
400  for index, state in enumerate(streamState):
401  if n==kSourceFindEvent and index == s:
402  states +="* "
403  else:
404  states +=str(state)+" "
405  states += " -- " + str(time/1000.) + " " + str(s) + " "
406  if waitTime is not None:
407  states += " %.2f"% (waitTime/1000.)
408  if waitTime > kStallThreshold:
409  states += " STALLED"
410 
411  print(states)
412 
413 #----------------------------------------------
414 def printStalledModulesInOrder(stalledModules):
415  priorities = []
416  maxNameSize = 0
417  for name,t in six.iteritems(stalledModules):
418  maxNameSize = max(maxNameSize, len(name))
419  t.sort(reverse=True)
420  priorities.append((name,sum(t),t))
421 
422  def sumSort(i,j):
423  return cmp(i[1],j[1])
424  priorities.sort(cmp=sumSort, reverse=True)
425 
426  nameColumn = "Stalled Module"
427  maxNameSize = max(maxNameSize, len(nameColumn))
428 
429  stallColumn = "Tot Stall Time"
430  stallColumnLength = len(stallColumn)
431 
432  print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
433  for n,s,t in priorities:
434  paddedName = "%-*s:" % (maxNameSize,n)
435  print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
436 
437 #--------------------------------------------------------
438 class Point:
439  def __init__(self, x_, y_):
440  self.x = x_
441  self.y = y_
442 
443  def __str__(self):
444  return "(x: {}, y: {})".format(self.x,self.y)
445 
446  def __repr__(self):
447  return self.__str__()
448 
449 #--------------------------------------------------------
451  if len(ps) < 2:
452  return ps
453  reducedPoints = []
454  tmp = Point(ps[0].x, ps[0].y)
455  for p in ps[1:]:
456  if tmp.x == p.x:
457  tmp.y += p.y
458  else:
459  reducedPoints.append(tmp)
460  tmp = Point(p.x, p.y)
461  reducedPoints.append(tmp)
462  reducedPoints = [p for p in reducedPoints if p.y != 0]
463  return reducedPoints
464 
465 # -------------------------------------------
466 def adjacentDiff(*pairLists):
467  points = []
468  for pairList in pairLists:
469  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
470  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
471  points.sort(key=attrgetter('x'))
472  return points
473 
474 stackType = 'stack'
475 
476 # --------------------------------------------
477 class Stack:
478  def __init__(self):
479  self.data = []
480 
481  def update(self, graphType, points):
482  tmp = points
483  if len(self.data) != 0:
484  tmp += self.data[-1][1]
485 
486  tmp.sort(key=attrgetter('x'))
487  tmp = reduceSortedPoints(tmp)
488  self.data.append((graphType, tmp))
489 
490 #---------------------------------------------
491 # StreamInfoElement
493  def __init__(self, begin_, delta_, color_):
494  self.begin=begin_
495  self.delta=delta_
496  self.color=color_
497 
498  def unpack(self):
499  return self.begin, self.delta, self.color
500 
501 #----------------------------------------------
502 # Consolidating contiguous blocks with the same color
503 # drastically reduces the size of the pdf file.
504 def consolidateContiguousBlocks(numStreams, streamInfo):
505  oldStreamInfo = streamInfo
506  streamInfo = [[] for x in xrange(numStreams)]
507 
508  for s in xrange(numStreams):
509  if oldStreamInfo[s]:
510  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
511  for info in oldStreamInfo[s][1:]:
512  start,length,color = info.unpack()
513  if color == lastColor and lastStartTime+lastTimeLength == start:
514  lastTimeLength += length
515  else:
516  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
517  lastStartTime = start
518  lastTimeLength = length
519  lastColor = color
520  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
521 
522  return streamInfo
523 
524 #----------------------------------------------
525 # Consolidating contiguous blocks with the same color drastically
526 # reduces the size of the pdf file. Same functionality as the
527 # previous function, but with slightly different implementation.
529  oldBlocks = blocks
530 
531  blocks = []
532  if not oldBlocks:
533  return blocks
534 
535  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
536  for start,length,height in oldBlocks[1:]:
537  if height == lastHeight and lastStartTime+lastTimeLength == start:
538  lastTimeLength += length
539  else:
540  blocks.append((lastStartTime,lastTimeLength,lastHeight))
541  lastStartTime = start
542  lastTimeLength = length
543  lastHeight = height
544  blocks.append((lastStartTime,lastTimeLength,lastHeight))
545 
546  return blocks
547 
548 #----------------------------------------------
549 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
550  points = sorted(points, key=attrgetter('x'))
551  points = reduceSortedPoints(points)
552  streamHeight = 0
553  preparedTimes = []
554  for t1,t2 in zip(points, points[1:]):
555  streamHeight += t1.y
556  # We make a cut here when plotting because the first row for
557  # each stream was already plotted previously and we do not
558  # need to plot it again. And also we want to count things
559  # properly in allStackTimes. We want to avoid double counting
560  # or missing running modules and this is complicated because
561  # we counted the modules in the first row already.
562  if streamHeight < streamHeightCut:
563  continue
564  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
565  preparedTimes.sort(key=itemgetter(2))
566  preparedTimes = mergeContiguousBlocks(preparedTimes)
567 
568  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
569  theTS = [(t[0],t[1]) for t in ts]
570  if doPlot:
571  theTimes = [(t[0]/1000.,t[1]/1000.) for t in theTS]
572  yspan = (stream-0.4+height,height*(nthreads-1))
573  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
574  if addToStackTimes:
575  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
576 
577 #----------------------------------------------
578 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
579 
580  stalledModuleNames = set([x for x in stalledModuleInfo.iterkeys()])
581  streamLowestRow = [[] for x in xrange(numStreams)]
582  modulesActiveOnStreams = [set() for x in xrange(numStreams)]
583  acquireActiveOnStreams = [set() for x in xrange(numStreams)]
584  externalWorkOnStreams = [set() for x in xrange(numStreams)]
585  previousFinishTime = [None for x in xrange(numStreams)]
586  streamRunningTimes = [[] for x in xrange(numStreams)]
587  streamExternalWorkRunningTimes = [[] for x in xrange(numStreams)]
588  maxNumberOfConcurrentModulesOnAStream = 1
589  externalWorkModulesInJob = False
590  previousTime = [0 for x in xrange(numStreams)]
591 
592  # The next five variables are only used to check for out of order transitions
593  finishBeforeStart = [set() for x in xrange(numStreams)]
594  finishAcquireBeforeStart = [set() for x in xrange(numStreams)]
595  countSource = [0 for x in xrange(numStreams)]
596  countDelayedSource = [0 for x in xrange(numStreams)]
597  countExternalWork = [defaultdict(int) for x in xrange(numStreams)]
598 
599  timeOffset = None
600  for n,trans,s,time,isEvent in processingSteps:
601  if timeOffset is None:
602  timeOffset = time
603  startTime = None
604  time -=timeOffset
605  # force the time to monotonically increase on each stream
606  if time < previousTime[s]:
607  time = previousTime[s]
608  previousTime[s] = time
609 
610  activeModules = modulesActiveOnStreams[s]
611  acquireModules = acquireActiveOnStreams[s]
612  externalWorkModules = externalWorkOnStreams[s]
613 
614  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
615  if checkOrder:
616  # Note that the code which checks the order of transitions assumes that
617  # all the transitions exist in the input. It is checking only for order
618  # problems, usually a start before a finish. Problems are fixed and
619  # silently ignored. Nothing gets plotted for transitions that are
620  # in the wrong order.
621  if trans == kStarted:
622  countExternalWork[s][n] -= 1
623  if n in finishBeforeStart[s]:
624  finishBeforeStart[s].remove(n)
625  continue
626  elif trans == kStartedAcquire:
627  if n in finishAcquireBeforeStart[s]:
628  finishAcquireBeforeStart[s].remove(n)
629  continue
630 
631  if trans == kStartedSourceDelayedRead:
632  countDelayedSource[s] += 1
633  if countDelayedSource[s] < 1:
634  continue
635  elif trans == kStartedSource:
636  countSource[s] += 1
637  if countSource[s] < 1:
638  continue
639 
640  moduleNames = activeModules.copy()
641  moduleNames.update(acquireModules)
642  if trans == kStartedAcquire:
643  acquireModules.add(n)
644  else:
645  activeModules.add(n)
646  streamRunningTimes[s].append(Point(time,1))
647  if moduleNames or externalWorkModules:
648  startTime = previousFinishTime[s]
649  previousFinishTime[s] = time
650 
651  if trans == kStarted and n in externalWorkModules:
652  externalWorkModules.remove(n)
653  streamExternalWorkRunningTimes[s].append(Point(time, -1))
654  else:
655  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
656  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
657  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
658  if checkOrder:
659  if trans == kFinished:
660  if n not in activeModules:
661  finishBeforeStart[s].add(n)
662  continue
663 
664  if trans == kFinishedSourceDelayedRead:
665  countDelayedSource[s] -= 1
666  if countDelayedSource[s] < 0:
667  continue
668  elif trans == kFinishedSource:
669  countSource[s] -= 1
670  if countSource[s] < 0:
671  continue
672 
673  if trans == kFinishedAcquire:
674  if checkOrder:
675  countExternalWork[s][n] += 1
676  if displayExternalWork:
677  externalWorkModulesInJob = True
678  if (not checkOrder) or countExternalWork[s][n] > 0:
679  externalWorkModules.add(n)
680  streamExternalWorkRunningTimes[s].append(Point(time,+1))
681  if checkOrder and n not in acquireModules:
682  finishAcquireBeforeStart[s].add(n)
683  continue
684  streamRunningTimes[s].append(Point(time,-1))
685  startTime = previousFinishTime[s]
686  previousFinishTime[s] = time
687  moduleNames = activeModules.copy()
688  moduleNames.update(acquireModules)
689 
690  if trans == kFinishedAcquire:
691  acquireModules.remove(n)
692  elif trans == kFinishedSourceDelayedRead:
693  if countDelayedSource[s] == 0:
694  activeModules.remove(n)
695  elif trans == kFinishedSource:
696  if countSource[s] == 0:
697  activeModules.remove(n)
698  else:
699  activeModules.remove(n)
700 
701  if startTime is not None:
702  c="green"
703  if not isEvent:
704  c="limegreen"
705  if not moduleNames:
706  c = "darkviolet"
707  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
708  c = "orange"
709  else:
710  for n in moduleNames:
711  if n in stalledModuleNames:
712  c="red"
713  break
714  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
715  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
716 
717  nr = 1
718  if shownStacks:
719  nr += 1
720  fig, ax = plt.subplots(nrows=nr, squeeze=True)
721  axStack = None
722  if shownStacks:
723  [xH,yH] = fig.get_size_inches()
724  fig.set_size_inches(xH,yH*4/3)
725  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
726  axStack = plt.subplot2grid((4,1),(3,0))
727 
728  ax.set_xlabel("Time (sec)")
729  ax.set_ylabel("Stream ID")
730  ax.set_ylim(-0.5,numStreams-0.5)
731  ax.yaxis.set_ticks(xrange(numStreams))
732 
733  height = 0.8/maxNumberOfConcurrentModulesOnAStream
734  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
735  for iStream,lowestRow in enumerate(streamLowestRow):
736  times=[(x.begin/1000., x.delta/1000.) for x in lowestRow] # Scale from msec to sec.
737  colors=[x.color for x in lowestRow]
738  # for each stream, plot the lowest row
739  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
740  # record them also for inclusion in the stack plot
741  # the darkviolet ones get counted later so do not count them here
742  for info in lowestRow:
743  if not info.color == 'darkviolet':
744  allStackTimes[info.color].append((info.begin, info.delta))
745 
746  # Now superimpose the number of concurrently running modules on to the graph.
747  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
748 
749  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
750 
751  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
752  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
753 
754  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
755  allStackTimes, ax, i, height,
756  streamHeightCut=2,
757  doPlot=True,
758  addToStackTimes=False,
759  color='darkviolet',
760  threadOffset=1)
761 
762  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
763  allStackTimes, ax, i, height,
764  streamHeightCut=2,
765  doPlot=True,
766  addToStackTimes=True,
767  color='blue',
768  threadOffset=1)
769 
770  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
771  allStackTimes, ax, i, height,
772  streamHeightCut=1,
773  doPlot=False,
774  addToStackTimes=True,
775  color='darkviolet',
776  threadOffset=0)
777 
778  if shownStacks:
779  print("> ... Generating stack")
780  stack = Stack()
781  for color in ['green','limegreen','blue','red','orange','darkviolet']:
782  tmp = allStackTimes[color]
783  tmp = reduceSortedPoints(adjacentDiff(tmp))
784  stack.update(color, tmp)
785 
786  for stk in reversed(stack.data):
787  color = stk[0]
788 
789  # Now arrange list in a manner that it can be grouped by the height of the block
790  height = 0
791  xs = []
792  for p1,p2 in zip(stk[1], stk[1][1:]):
793  height += p1.y
794  xs.append((p1.x, p2.x-p1.x, height))
795  xs.sort(key = itemgetter(2))
796  xs = mergeContiguousBlocks(xs)
797 
798  for height, xpairs in groupby(xs, itemgetter(2)):
799  finalxs = [(e[0]/1000.,e[1]/1000.) for e in xpairs]
800  # plot the stacked plot, one color and one height on each call to broken_barh
801  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
802 
803  axStack.set_xlabel("Time (sec)");
804  axStack.set_ylabel("# modules");
805  axStack.set_xlim(ax.get_xlim())
806  axStack.tick_params(top='off')
807 
808  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
809  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
810  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
811  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
812  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
813  if displayExternalWork:
814  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
815  print("> ... Saving to file: '{}'".format(pdfFile))
816  plt.savefig(pdfFile)
817 
818 #=======================================
819 if __name__=="__main__":
820  import argparse
821  import re
822  import sys
823 
824  # Program options
825  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
826  formatter_class=argparse.RawDescriptionHelpFormatter,
827  epilog=printHelp())
828  parser.add_argument('filename',
829  type=argparse.FileType('r'), # open file
830  help='file to process')
831  parser.add_argument('-g', '--graph',
832  nargs='?',
833  metavar="'stall.pdf'",
834  const='stall.pdf',
835  dest='graph',
836  help='''Create pdf file of stream stall graph. If -g is specified
837  by itself, the default file name is \'stall.pdf\'. Otherwise, the
838  argument to the -g option is the filename.''')
839  parser.add_argument('-s', '--stack',
840  action='store_true',
841  help='''Create stack plot, combining all stream-specific info.
842  Can be used only when -g is specified.''')
843  parser.add_argument('-e', '--external',
844  action='store_false',
845  help='''Suppress display of external work in graphs.''')
846  parser.add_argument('-o', '--order',
847  action='store_true',
848  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
849  parser.add_argument('-t', '--timings',
850  action='store_true',
851  help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a yaml file modules-timings.yaml.''')
852  args = parser.parse_args()
853 
854  # Process parsed options
855  inputFile = args.filename
856  pdfFile = args.graph
857  shownStacks = args.stack
858  displayExternalWork = args.external
859  checkOrder = args.order
860  doModuleTimings = False
861  if args.timings:
862  doModuleTimings = True
863 
864  doGraphic = False
865  if pdfFile is not None:
866  doGraphic = True
867  import matplotlib
868  # Need to force display since problems with CMSSW matplotlib.
869  matplotlib.use("PDF")
870  import matplotlib.pyplot as plt
871  if not re.match(r'^[\w\.]+$', pdfFile):
872  print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
873  print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
874  exit(1)
875 
876  if '.' in pdfFile:
877  extension = pdfFile.split('.')[-1]
878  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
879  if not extension in supported_filetypes:
880  print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
881  print("The allowed extensions are:")
882  for filetype in supported_filetypes:
883  print(" '.{}'".format(filetype))
884  exit(1)
885 
886  if pdfFile is None and shownStacks:
887  print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
888  exit(1)
889 
890  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
891  reader = readLogFile(inputFile)
892  if kTracerInput:
893  checkOrder = True
894  sys.stderr.write(">processing data\n")
895  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
896 
897 
898  if not doGraphic:
899  sys.stderr.write(">preparing ASCII art\n")
900  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
901  else:
902  sys.stderr.write(">creating PDF\n")
903  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
904  printStalledModulesInOrder(stalledModules)
905  if doModuleTimings:
906  sys.stderr.write(">creating module-timings.yaml\n")
907  createModuleTiming(reader.processingSteps(), reader.numStreams)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def createModuleTiming(processingSteps, numStreams)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def processingStepsFromStallMonitorOutput(f, moduleNames)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
#define str(s)
double split
Definition: MVATrainer.cc:139
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run