CMS 3D CMS Logo

edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from itertools import groupby
3 from operator import attrgetter,itemgetter
4 import sys
5 from collections import defaultdict
6 import six
7 
8 #----------------------------------------------
9 def printHelp():
10  s = '''
11 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
12  stream stalls. Use something like this in the configuration:
13 
14  process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
15 
16  After running the job, execute this script and pass the name of the
17  StallMonitor log file to the script.
18 
19  By default, the script will then print an 'ASCII art' stall graph
20  which consists of a line of text for each time a module or the
21  source stops or starts. Each line contains the name of the module
22  which either started or stopped running, and the number of modules
23  running on each stream at that moment in time. After that will be
24  the time and stream number. Then if a module just started, you
25  will also see the amount of time the module spent between finishing
26  its prefetching and starting. The state of a module is represented
27  by a symbol:
28 
29  plus ("+") the stream has just finished waiting and is starting a module
30  minus ("-") the stream just finished running a module
31 
32  If a module had to wait more than 0.1 seconds, the end of the line
33  will have "STALLED". Startup actions, e.g. reading conditions,
34  may affect results for the first few events.
35 
36  Using the command line arguments described above you can make the
37  program create a PDF file with actual graphs instead of the 'ASCII art'
38  output.
39 
40  Once the graph is completed, the program outputs the list of modules
41  which had the greatest total stall times. The list is sorted by
42  total stall time and written in descending order. In addition, the
43  list of all stall times for the module is given.
44 
45  There is an inferior alternative (an old obsolete way).
46  Instead of using the StallMonitor Service, you can use the
47  Tracer Service. Make sure to use the 'printTimestamps' option
48  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
49  There are problems associated with this and it is not recommended.'''
50  return s
51 
52 kStallThreshold=100 #in milliseconds
53 kTracerInput=False
54 
55 #Stream states
56 kStarted=0
57 kFinished=1
58 kPrefetchEnd=2
59 kStartedAcquire=3
60 kFinishedAcquire=4
61 kStartedSource=5
62 kFinishedSource=6
63 kStartedSourceDelayedRead=7
64 kFinishedSourceDelayedRead=8
65 
66 #Special names
67 kSourceFindEvent = "sourceFindEvent"
68 kSourceDelayedRead ="sourceDelayedRead"
69 
70 #----------------------------------------------
72  for rawl in f:
73  l = rawl.strip()
74  if not l or l[0] == '#':
75  continue
76  (step,payload) = tuple(l.split(None,1))
77  payload=payload.split()
78 
79  # Ignore these
80  if step == 'E' or step == 'e':
81  continue
82 
83  # Payload format is:
84  # <stream id> <..other fields..> <time since begin job>
85  stream = int(payload[0])
86  time = int(payload[-1])
87  trans = None
88  isEvent = True
89 
90  name = None
91  # 'S' = begin of event creation in source
92  # 's' = end of event creation in source
93  if step == 'S' or step == 's':
94  name = kSourceFindEvent
95  trans = kStartedSource
96  # The start of an event is the end of the framework part
97  if step == 's':
98  trans = kFinishedSource
99  else:
100  # moduleID is the second payload argument for all steps below
101  moduleID = payload[1]
102 
103  # 'p' = end of module prefetching
104  # 'M' = begin of module processing
105  # 'm' = end of module processing
106  if step == 'p' or step == 'M' or step == 'm':
107  trans = kStarted
108  if step == 'p':
109  trans = kPrefetchEnd
110  elif step == 'm':
111  trans = kFinished
112  if step == 'm' or step == 'M':
113  isEvent = (int(payload[2]) == 0)
114  name = moduleNames[moduleID]
115 
116  # 'A' = begin of module acquire function
117  # 'a' = end of module acquire function
118  elif step == 'A' or step == 'a':
119  trans = kStartedAcquire
120  if step == 'a':
121  trans = kFinishedAcquire
122  name = moduleNames[moduleID]
123 
124  # Delayed read from source
125  # 'R' = begin of delayed read from source
126  # 'r' = end of delayed read from source
127  elif step == 'R' or step == 'r':
128  trans = kStartedSourceDelayedRead
129  if step == 'r':
130  trans = kFinishedSourceDelayedRead
131  name = kSourceDelayedRead
132 
133  if trans is not None:
134  yield (name,trans,stream,time, isEvent)
135 
136  return
137 
139  def __init__(self,f):
140  numStreams = 0
141  numStreamsFromSource = 0
142  moduleNames = {}
143  for rawl in f:
144  l = rawl.strip()
145  if l and l[0] == 'M':
146  i = l.split(' ')
147  if i[3] == '4':
148  #found global begin run
149  numStreams = int(i[1])+1
150  break
151  if numStreams == 0 and l and l[0] == 'S':
152  s = int(l.split(' ')[1])
153  if s > numStreamsFromSource:
154  numStreamsFromSource = s
155  if len(l) > 5 and l[0:2] == "#M":
156  (id,name)=tuple(l[2:].split())
157  moduleNames[id] = name
158  continue
159  self._f = f
160  if numStreams == 0:
161  numStreams = numStreamsFromSource +1
162  self.numStreams =numStreams
163  self._moduleNames = moduleNames
164  self.maxNameSize =0
165  for n in six.iteritems(moduleNames):
166  self.maxNameSize = max(self.maxNameSize,len(n))
167  self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
168 
169  def processingSteps(self):
170  """Create a generator which can step through the file and return each processing step.
171  Using a generator reduces the memory overhead when parsing a large file.
172  """
173  self._f.seek(0)
175 
176 #----------------------------------------------
177 # Utility to get time out of Tracer output text format
178 def getTime(line):
179  time = line.split(" ")[1]
180  time = time.split(":")
181  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
182  time = int(1000*time) # convert to milliseconds
183  return time
184 
185 #----------------------------------------------
186 # The next function parses the Tracer output.
187 # Here are some differences to consider if you use Tracer output
188 # instead of the StallMonitor output.
189 # - The time in the text of the Tracer output is not as precise
190 # as the StallMonitor (.01 s vs .001 s)
191 # - The MessageLogger bases the time on when the message printed
192 # and not when it was initially queued up to print which smears
193 # the accuracy of the times.
194 # - Both of the previous things can produce some strange effects
195 # in the output plots.
196 # - The file size of the Tracer text file is much larger.
197 # - The CPU work needed to parse the Tracer files is larger.
198 # - The Tracer log file is expected to have "++" in the first
199 # or fifth line. If there are extraneous lines at the beginning
200 # you have to remove them.
201 # - The ascii printout out will have one extraneous line
202 # near the end for the SourceFindEvent start.
203 # - The only advantage I can see is that you have only
204 # one output file to handle instead of two, the regular
205 # log file and the StallMonitor output.
206 # We might should just delete the Tracer option because it is
207 # clearly inferior ...
209  processingSteps = []
210  numStreams = 0
211  maxNameSize = 0
212  startTime = 0
213  streamsThatSawFirstEvent = set()
214  for l in f:
215  trans = None
216  # We estimate the start and stop of the source
217  # by the end of the previous event and start of
218  # the event. This is historical, probably because
219  # the Tracer output for the begin and end of the
220  # source event does not include the stream number.
221  if l.find("processing event :") != -1:
222  name = kSourceFindEvent
223  trans = kStartedSource
224  # the end of the source is estimated using the start of the event
225  if l.find("starting:") != -1:
226  trans = kFinishedSource
227  elif l.find("processing event for module") != -1:
228  trans = kStarted
229  if l.find("finished:") != -1:
230  if l.find("prefetching") != -1:
231  trans = kPrefetchEnd
232  else:
233  trans = kFinished
234  else:
235  if l.find("prefetching") != -1:
236  #skip this since we don't care about prefetch starts
237  continue
238  name = l.split("'")[1]
239  elif l.find("processing event acquire for module:") != -1:
240  trans = kStartedAcquire
241  if l.find("finished:") != -1:
242  trans = kFinishedAcquire
243  name = l.split("'")[1]
244  elif l.find("event delayed read from source") != -1:
245  trans = kStartedSourceDelayedRead
246  if l.find("finished:") != -1:
247  trans = kFinishedSourceDelayedRead
248  name = kSourceDelayedRead
249  if trans is not None:
250  time = getTime(l)
251  if startTime == 0:
252  startTime = time
253  time = time - startTime
254  streamIndex = l.find("stream = ")
255  stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
256  maxNameSize = max(maxNameSize, len(name))
257 
258  if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
259  # This is wrong but there is no way to estimate the time better
260  # because there is no previous event for the first event.
261  processingSteps.append((name,kStartedSource,stream,time,True))
262  streamsThatSawFirstEvent.add(stream)
263 
264  processingSteps.append((name,trans,stream,time, True))
265  numStreams = max(numStreams, stream+1)
266 
267  f.close()
268  return (processingSteps,numStreams,maxNameSize)
269 
271  def __init__(self,f):
272  self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
273  def processingSteps(self):
274  return self._processingSteps
275 
276 #----------------------------------------------
277 def chooseParser(inputFile):
278 
279  firstLine = inputFile.readline().rstrip()
280  for i in range(3):
281  inputFile.readline()
282  # Often the Tracer log file starts with 4 lines not from the Tracer
283  fifthLine = inputFile.readline().rstrip()
284  inputFile.seek(0) # Rewind back to beginning
285  if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
286  print "> ... Parsing StallMonitor output."
287  return StallMonitorParser
288 
289  if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
290  global kTracerInput
291  kTracerInput = True
292  print "> ... Parsing Tracer output."
293  return TracerParser
294  else:
295  inputFile.close()
296  print "Unknown input format."
297  exit(1)
298 
299 #----------------------------------------------
300 def readLogFile(inputFile):
301  parseInput = chooseParser(inputFile)
302  return parseInput(inputFile)
303 
304 #----------------------------------------------
305 #
306 # modules: The time between prefetch finished and 'start processing' is
307 # the time it took to acquire any resources which is by definition the
308 # stall time.
309 #
310 # source: The source just records how long it spent doing work,
311 # not how long it was stalled. We can get a lower bound on the stall
312 # time for delayed reads by measuring the time the stream was doing
313 # no work up till the start of the source delayed read.
314 #
315 def findStalledModules(processingSteps, numStreams):
316  streamTime = [0]*numStreams
317  streamState = [0]*numStreams
318  stalledModules = {}
319  modulesActiveOnStream = [{} for x in xrange(numStreams)]
320  for n,trans,s,time,isEvent in processingSteps:
321 
322  waitTime = None
323  modulesOnStream = modulesActiveOnStream[s]
324  if trans == kPrefetchEnd:
325  modulesOnStream[n] = time
326  elif trans == kStarted or trans == kStartedAcquire:
327  if n in modulesOnStream:
328  waitTime = time - modulesOnStream[n]
329  modulesOnStream.pop(n, None)
330  streamState[s] +=1
331  elif trans == kFinished or trans == kFinishedAcquire:
332  streamState[s] -=1
333  streamTime[s] = time
334  elif trans == kStartedSourceDelayedRead:
335  if streamState[s] == 0:
336  waitTime = time - streamTime[s]
337  elif trans == kStartedSource:
338  modulesOnStream.clear()
339  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
340  streamTime[s] = time
341  if waitTime is not None:
342  if waitTime > kStallThreshold:
343  t = stalledModules.setdefault(n,[])
344  t.append(waitTime)
345  return stalledModules
346 
347 #----------------------------------------------
348 def createAsciiImage(processingSteps, numStreams, maxNameSize):
349  streamTime = [0]*numStreams
350  streamState = [0]*numStreams
351  modulesActiveOnStreams = [{} for x in xrange(numStreams)]
352  for n,trans,s,time,isEvent in processingSteps:
353  waitTime = None
354  modulesActiveOnStream = modulesActiveOnStreams[s]
355  if trans == kPrefetchEnd:
356  modulesActiveOnStream[n] = time
357  continue
358  elif trans == kStartedAcquire or trans == kStarted:
359  if n in modulesActiveOnStream:
360  waitTime = time - modulesActiveOnStream[n]
361  modulesActiveOnStream.pop(n, None)
362  streamState[s] +=1
363  elif trans == kFinishedAcquire or trans == kFinished:
364  streamState[s] -=1
365  streamTime[s] = time
366  elif trans == kStartedSourceDelayedRead:
367  if streamState[s] == 0:
368  waitTime = time - streamTime[s]
369  elif trans == kStartedSource:
370  modulesActiveOnStream.clear()
371  elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
372  streamTime[s] = time
373  states = "%-*s: " % (maxNameSize,n)
374  if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
375  states +="+ "
376  else:
377  states +="- "
378  for index, state in enumerate(streamState):
379  if n==kSourceFindEvent and index == s:
380  states +="* "
381  else:
382  states +=str(state)+" "
383  states += " -- " + str(time/1000.) + " " + str(s) + " "
384  if waitTime is not None:
385  states += " %.2f"% (waitTime/1000.)
386  if waitTime > kStallThreshold:
387  states += " STALLED"
388 
389  print states
390 
391 #----------------------------------------------
392 def printStalledModulesInOrder(stalledModules):
393  priorities = []
394  maxNameSize = 0
395  for name,t in six.iteritems(stalledModules):
396  maxNameSize = max(maxNameSize, len(name))
397  t.sort(reverse=True)
398  priorities.append((name,sum(t),t))
399 
400  def sumSort(i,j):
401  return cmp(i[1],j[1])
402  priorities.sort(cmp=sumSort, reverse=True)
403 
404  nameColumn = "Stalled Module"
405  maxNameSize = max(maxNameSize, len(nameColumn))
406 
407  stallColumn = "Tot Stall Time"
408  stallColumnLength = len(stallColumn)
409 
410  print "%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times"
411  for n,s,t in priorities:
412  paddedName = "%-*s:" % (maxNameSize,n)
413  print paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t])
414 
415 #--------------------------------------------------------
416 class Point:
417  def __init__(self, x_, y_):
418  self.x = x_
419  self.y = y_
420 
421  def __str__(self):
422  return "(x: {}, y: {})".format(self.x,self.y)
423 
424  def __repr__(self):
425  return self.__str__()
426 
427 #--------------------------------------------------------
429  if len(ps) < 2:
430  return ps
431  reducedPoints = []
432  tmp = Point(ps[0].x, ps[0].y)
433  for p in ps[1:]:
434  if tmp.x == p.x:
435  tmp.y += p.y
436  else:
437  reducedPoints.append(tmp)
438  tmp = Point(p.x, p.y)
439  reducedPoints.append(tmp)
440  reducedPoints = [p for p in reducedPoints if p.y != 0]
441  return reducedPoints
442 
443 # -------------------------------------------
444 def adjacentDiff(*pairLists):
445  points = []
446  for pairList in pairLists:
447  points += [Point(x[0], 1) for x in pairList if x[1] != 0]
448  points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
449  points.sort(key=attrgetter('x'))
450  return points
451 
452 stackType = 'stack'
453 
454 # --------------------------------------------
455 class Stack:
456  def __init__(self):
457  self.data = []
458 
459  def update(self, graphType, points):
460  tmp = points
461  if len(self.data) != 0:
462  tmp += self.data[-1][1]
463 
464  tmp.sort(key=attrgetter('x'))
465  tmp = reduceSortedPoints(tmp)
466  self.data.append((graphType, tmp))
467 
468 #---------------------------------------------
469 # StreamInfoElement
471  def __init__(self, begin_, delta_, color_):
472  self.begin=begin_
473  self.delta=delta_
474  self.color=color_
475 
476  def unpack(self):
477  return self.begin, self.delta, self.color
478 
479 #----------------------------------------------
480 # Consolidating contiguous blocks with the same color
481 # drastically reduces the size of the pdf file.
482 def consolidateContiguousBlocks(numStreams, streamInfo):
483  oldStreamInfo = streamInfo
484  streamInfo = [[] for x in xrange(numStreams)]
485 
486  for s in xrange(numStreams):
487  if oldStreamInfo[s]:
488  lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
489  for info in oldStreamInfo[s][1:]:
490  start,length,color = info.unpack()
491  if color == lastColor and lastStartTime+lastTimeLength == start:
492  lastTimeLength += length
493  else:
494  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
495  lastStartTime = start
496  lastTimeLength = length
497  lastColor = color
498  streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
499 
500  return streamInfo
501 
502 #----------------------------------------------
503 # Consolidating contiguous blocks with the same color drastically
504 # reduces the size of the pdf file. Same functionality as the
505 # previous function, but with slightly different implementation.
507  oldBlocks = blocks
508 
509  blocks = []
510  if not oldBlocks:
511  return blocks
512 
513  lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
514  for start,length,height in oldBlocks[1:]:
515  if height == lastHeight and lastStartTime+lastTimeLength == start:
516  lastTimeLength += length
517  else:
518  blocks.append((lastStartTime,lastTimeLength,lastHeight))
519  lastStartTime = start
520  lastTimeLength = length
521  lastHeight = height
522  blocks.append((lastStartTime,lastTimeLength,lastHeight))
523 
524  return blocks
525 
526 #----------------------------------------------
527 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
528  points = sorted(points, key=attrgetter('x'))
529  points = reduceSortedPoints(points)
530  streamHeight = 0
531  preparedTimes = []
532  for t1,t2 in zip(points, points[1:]):
533  streamHeight += t1.y
534  # We make a cut here when plotting because the first row for
535  # each stream was already plotted previously and we do not
536  # need to plot it again. And also we want to count things
537  # properly in allStackTimes. We want to avoid double counting
538  # or missing running modules and this is complicated because
539  # we counted the modules in the first row already.
540  if streamHeight < streamHeightCut:
541  continue
542  preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
543  preparedTimes.sort(key=itemgetter(2))
544  preparedTimes = mergeContiguousBlocks(preparedTimes)
545 
546  for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
547  theTS = [(t[0],t[1]) for t in ts]
548  if doPlot:
549  theTimes = [(t[0]/1000.,t[1]/1000.) for t in theTS]
550  yspan = (stream-0.4+height,height*(nthreads-1))
551  ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
552  if addToStackTimes:
553  allStackTimes[color].extend(theTS*(nthreads-threadOffset))
554 
555 #----------------------------------------------
556 def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder):
557 
558  stalledModuleNames = set([x for x in stalledModuleInfo.iterkeys()])
559  streamLowestRow = [[] for x in xrange(numStreams)]
560  modulesActiveOnStreams = [set() for x in xrange(numStreams)]
561  acquireActiveOnStreams = [set() for x in xrange(numStreams)]
562  externalWorkOnStreams = [set() for x in xrange(numStreams)]
563  previousFinishTime = [None for x in xrange(numStreams)]
564  streamRunningTimes = [[] for x in xrange(numStreams)]
565  streamExternalWorkRunningTimes = [[] for x in xrange(numStreams)]
566  maxNumberOfConcurrentModulesOnAStream = 1
567  externalWorkModulesInJob = False
568  previousTime = [0 for x in xrange(numStreams)]
569 
570  # The next five variables are only used to check for out of order transitions
571  finishBeforeStart = [set() for x in xrange(numStreams)]
572  finishAcquireBeforeStart = [set() for x in xrange(numStreams)]
573  countSource = [0 for x in xrange(numStreams)]
574  countDelayedSource = [0 for x in xrange(numStreams)]
575  countExternalWork = [defaultdict(int) for x in xrange(numStreams)]
576 
577  timeOffset = None
578  for n,trans,s,time,isEvent in processingSteps:
579  if timeOffset is None:
580  timeOffset = time
581  startTime = None
582  time -=timeOffset
583  # force the time to monotonically increase on each stream
584  if time < previousTime[s]:
585  time = previousTime[s]
586  previousTime[s] = time
587 
588  activeModules = modulesActiveOnStreams[s]
589  acquireModules = acquireActiveOnStreams[s]
590  externalWorkModules = externalWorkOnStreams[s]
591 
592  if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
593  if checkOrder:
594  # Note that the code which checks the order of transitions assumes that
595  # all the transitions exist in the input. It is checking only for order
596  # problems, usually a start before a finish. Problems are fixed and
597  # silently ignored. Nothing gets plotted for transitions that are
598  # in the wrong order.
599  if trans == kStarted:
600  countExternalWork[s][n] -= 1
601  if n in finishBeforeStart[s]:
602  finishBeforeStart[s].remove(n)
603  continue
604  elif trans == kStartedAcquire:
605  if n in finishAcquireBeforeStart[s]:
606  finishAcquireBeforeStart[s].remove(n)
607  continue
608 
609  if trans == kStartedSourceDelayedRead:
610  countDelayedSource[s] += 1
611  if countDelayedSource[s] < 1:
612  continue
613  elif trans == kStartedSource:
614  countSource[s] += 1
615  if countSource[s] < 1:
616  continue
617 
618  moduleNames = activeModules.copy()
619  moduleNames.update(acquireModules)
620  if trans == kStartedAcquire:
621  acquireModules.add(n)
622  else:
623  activeModules.add(n)
624  streamRunningTimes[s].append(Point(time,1))
625  if moduleNames or externalWorkModules:
626  startTime = previousFinishTime[s]
627  previousFinishTime[s] = time
628 
629  if trans == kStarted and n in externalWorkModules:
630  externalWorkModules.remove(n)
631  streamExternalWorkRunningTimes[s].append(Point(time, -1))
632  else:
633  nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
634  maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
635  elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
636  if checkOrder:
637  if trans == kFinished:
638  if n not in activeModules:
639  finishBeforeStart[s].add(n)
640  continue
641 
642  if trans == kFinishedSourceDelayedRead:
643  countDelayedSource[s] -= 1
644  if countDelayedSource[s] < 0:
645  continue
646  elif trans == kFinishedSource:
647  countSource[s] -= 1
648  if countSource[s] < 0:
649  continue
650 
651  if trans == kFinishedAcquire:
652  if checkOrder:
653  countExternalWork[s][n] += 1
654  if displayExternalWork:
655  externalWorkModulesInJob = True
656  if (not checkOrder) or countExternalWork[s][n] > 0:
657  externalWorkModules.add(n)
658  streamExternalWorkRunningTimes[s].append(Point(time,+1))
659  if checkOrder and n not in acquireModules:
660  finishAcquireBeforeStart[s].add(n)
661  continue
662  streamRunningTimes[s].append(Point(time,-1))
663  startTime = previousFinishTime[s]
664  previousFinishTime[s] = time
665  moduleNames = activeModules.copy()
666  moduleNames.update(acquireModules)
667 
668  if trans == kFinishedAcquire:
669  acquireModules.remove(n)
670  elif trans == kFinishedSourceDelayedRead:
671  if countDelayedSource[s] == 0:
672  activeModules.remove(n)
673  elif trans == kFinishedSource:
674  if countSource[s] == 0:
675  activeModules.remove(n)
676  else:
677  activeModules.remove(n)
678 
679  if startTime is not None:
680  c="green"
681  if not isEvent:
682  c="limegreen"
683  if not moduleNames:
684  c = "darkviolet"
685  elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
686  c = "orange"
687  else:
688  for n in moduleNames:
689  if n in stalledModuleNames:
690  c="red"
691  break
692  streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
693  streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
694 
695  nr = 1
696  if shownStacks:
697  nr += 1
698  fig, ax = plt.subplots(nrows=nr, squeeze=True)
699  axStack = None
700  if shownStacks:
701  [xH,yH] = fig.get_size_inches()
702  fig.set_size_inches(xH,yH*4/3)
703  ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
704  axStack = plt.subplot2grid((4,1),(3,0))
705 
706  ax.set_xlabel("Time (sec)")
707  ax.set_ylabel("Stream ID")
708  ax.set_ylim(-0.5,numStreams-0.5)
709  ax.yaxis.set_ticks(xrange(numStreams))
710 
711  height = 0.8/maxNumberOfConcurrentModulesOnAStream
712  allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
713  for iStream,lowestRow in enumerate(streamLowestRow):
714  times=[(x.begin/1000., x.delta/1000.) for x in lowestRow] # Scale from msec to sec.
715  colors=[x.color for x in lowestRow]
716  # for each stream, plot the lowest row
717  ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
718  # record them also for inclusion in the stack plot
719  # the darkviolet ones get counted later so do not count them here
720  for info in lowestRow:
721  if not info.color == 'darkviolet':
722  allStackTimes[info.color].append((info.begin, info.delta))
723 
724  # Now superimpose the number of concurrently running modules on to the graph.
725  if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
726 
727  for i,perStreamRunningTimes in enumerate(streamRunningTimes):
728 
729  perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
730  perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
731 
732  plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
733  allStackTimes, ax, i, height,
734  streamHeightCut=2,
735  doPlot=True,
736  addToStackTimes=False,
737  color='darkviolet',
738  threadOffset=1)
739 
740  plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
741  allStackTimes, ax, i, height,
742  streamHeightCut=2,
743  doPlot=True,
744  addToStackTimes=True,
745  color='blue',
746  threadOffset=1)
747 
748  plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
749  allStackTimes, ax, i, height,
750  streamHeightCut=1,
751  doPlot=False,
752  addToStackTimes=True,
753  color='darkviolet',
754  threadOffset=0)
755 
756  if shownStacks:
757  print "> ... Generating stack"
758  stack = Stack()
759  for color in ['green','limegreen','blue','red','orange','darkviolet']:
760  tmp = allStackTimes[color]
761  tmp = reduceSortedPoints(adjacentDiff(tmp))
762  stack.update(color, tmp)
763 
764  for stk in reversed(stack.data):
765  color = stk[0]
766 
767  # Now arrange list in a manner that it can be grouped by the height of the block
768  height = 0
769  xs = []
770  for p1,p2 in zip(stk[1], stk[1][1:]):
771  height += p1.y
772  xs.append((p1.x, p2.x-p1.x, height))
773  xs.sort(key = itemgetter(2))
774  xs = mergeContiguousBlocks(xs)
775 
776  for height, xpairs in groupby(xs, itemgetter(2)):
777  finalxs = [(e[0]/1000.,e[1]/1000.) for e in xpairs]
778  # plot the stacked plot, one color and one height on each call to broken_barh
779  axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
780 
781  axStack.set_xlabel("Time (sec)");
782  axStack.set_ylabel("# modules");
783  axStack.set_xlim(ax.get_xlim())
784  axStack.tick_params(top='off')
785 
786  fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
787  fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
788  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
789  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
790  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
791  if displayExternalWork:
792  fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
793  print "> ... Saving to file: '{}'".format(pdfFile)
794  plt.savefig(pdfFile)
795 
796 #=======================================
797 if __name__=="__main__":
798  import argparse
799  import re
800  import sys
801 
802  # Program options
803  parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
804  formatter_class=argparse.RawDescriptionHelpFormatter,
805  epilog=printHelp())
806  parser.add_argument('filename',
807  type=argparse.FileType('r'), # open file
808  help='file to process')
809  parser.add_argument('-g', '--graph',
810  nargs='?',
811  metavar="'stall.pdf'",
812  const='stall.pdf',
813  dest='graph',
814  help='''Create pdf file of stream stall graph. If -g is specified
815  by itself, the default file name is \'stall.pdf\'. Otherwise, the
816  argument to the -g option is the filename.''')
817  parser.add_argument('-s', '--stack',
818  action='store_true',
819  help='''Create stack plot, combining all stream-specific info.
820  Can be used only when -g is specified.''')
821  parser.add_argument('-e', '--external',
822  action='store_false',
823  help='''Suppress display of external work in graphs.''')
824  parser.add_argument('-o', '--order',
825  action='store_true',
826  help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
827  args = parser.parse_args()
828 
829  # Process parsed options
830  inputFile = args.filename
831  pdfFile = args.graph
832  shownStacks = args.stack
833  displayExternalWork = args.external
834  checkOrder = args.order
835 
836  doGraphic = False
837  if pdfFile is not None:
838  doGraphic = True
839  import matplotlib
840  # Need to force display since problems with CMSSW matplotlib.
841  matplotlib.use("PDF")
842  import matplotlib.pyplot as plt
843  if not re.match(r'^[\w\.]+$', pdfFile):
844  print "Malformed file name '{}' supplied with the '-g' option.".format(pdfFile)
845  print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed."
846  exit(1)
847 
848  if '.' in pdfFile:
849  extension = pdfFile.split('.')[-1]
850  supported_filetypes = plt.figure().canvas.get_supported_filetypes()
851  if not extension in supported_filetypes:
852  print "A graph cannot be saved to a filename with extension '{}'.".format(extension)
853  print "The allowed extensions are:"
854  for filetype in supported_filetypes:
855  print " '.{}'".format(filetype)
856  exit(1)
857 
858  if pdfFile is None and shownStacks:
859  print "The -s (--stack) option can be used only when the -g (--graph) option is specified."
860  exit(1)
861 
862  sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
863  reader = readLogFile(inputFile)
864  if kTracerInput:
865  checkOrder = True
866  sys.stderr.write(">processing data\n")
867  stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
868 
869  if not doGraphic:
870  sys.stderr.write(">preparing ASCII art\n")
871  createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
872  else:
873  sys.stderr.write(">creating PDF\n")
874  createPDFImage(pdfFile, shownStacks, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder)
875  printStalledModulesInOrder(stalledModules)
def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder)
def update(self, graphType, points)
def findStalledModules(processingSteps, numStreams)
def consolidateContiguousBlocks(numStreams, streamInfo)
def __init__(self, begin_, delta_, color_)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def printStalledModulesInOrder(stalledModules)
def createAsciiImage(processingSteps, numStreams, maxNameSize)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def processingStepsFromStallMonitorOutput(f, moduleNames)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset)
#define str(s)
double split
Definition: MVATrainer.cc:139
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run