test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
edmStreamStallGrapher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import re
3 import sys
4 
5 #----------------------------------------------
6 def printHelp():
7  s = """Purpose: Convert a cmsRun log with Tracer info into a stream stall graph.
8 
9 edmStreamStallGrapher [-g[=arg]] <log file name>
10 
11 Options: -g[=arg] instead of ascii art, create a pdf file of name
12  'arg' showing the work being done on each stream. If '=arg'
13  is not specified, the pdf file name is 'stall.pdf'. There
14  can be no spaces before and after the '=' sign.
15 
16 To Use: Add the Tracer Service to the cmsRun job you want to check for
17  stream stalls. Make sure to use the 'printTimstamps' option
18  cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
19  After running the job, execute this script and pass the name of the
20  log file to the script as the only command line argument.
21 
22 To Read: The script will then print an 'ASCII art' stall graph which
23  consists of the name of the module which either started or stopped
24  running on a stream, and the number of modules running on each
25  stream at that the moment in time. If the module just started, you
26  will also see the amount of time the module spent between finishing
27  its prefetching and starting. The state of a module is represented
28  by a symbol:
29 
30  plus ("+") the stream has just finished waiting and is starting a module
31  minus ("-") the stream just finished running a module
32 
33  If a module had to wait more than 0.1 seconds, the end of the line
34  will have "STALLED". Once the first 4 events have finished
35  processing, the program prints "FINISH INIT". This is useful if one
36  wants to ignore stalled caused by startup actions, e.g. reading
37  conditions.
38 
39  Once the graph is completed, the program outputs the list of modules
40  which had the greatest total stall times. The list is sorted by
41  total stall time and written in descending order. In addition, the
42  list of all stall times for the module is given. """
43  print s
44 
45 
46 kStallThreshold=100 #in milliseconds
47 kTracerInput=False
48 
49 #Stream states
50 kStarted=0
51 kFinished=1
52 kPrefetchEnd=2
53 
54 #Special names
55 kSourceFindEvent = "sourceFindEvent"
56 kSourceDelayedRead ="sourceDelayedRead"
57 kFinishInit = "FINISH INIT"
58 
59 #----------------------------------------------
61  processingSteps = []
62  numStreams = 0
63  maxNameSize = 0
64  foundEventToStartFrom = False
65  moduleNames = {}
66  for rawl in f:
67  l = rawl.strip()
68  if not l or l[0] == '#':
69  if len(l) > 5 and l[0:2] == "#M":
70  (id,name)=tuple(l[2:].split())
71  moduleNames[id] = name
72  continue
73  (step,payload) = tuple(l.split(None,1))
74  payload=payload.split()
75 
76  # Payload format is:
77  # <stream id> <..other fields..> <time since begin job>
78  stream = int(payload[0])
79  time = int(payload[-1])
80 
81  if stream > numStreams:
82  numStreams = stream
83 
84  if not foundEventToStartFrom:
85  # Event number is second from the end for the 'E' step
86  if step == 'E' and payload[-2] == '5':
87  foundEventToStartFrom = True
88  processingSteps.append((kFinishInit,kFinished,stream,time))
89  continue
90 
91  # 'E' = begin of event processing
92  # 'e' = end of event processing
93  if step == 'E' or step == 'e':
94  name = kSourceFindEvent
95  trans = kStarted
96  # The start of an event is the end of the framework part
97  if step == 'e':
98  trans = kFinished
99 
100  # moduleID is the second payload argument for all steps below
101  moduleID = payload[1]
102 
103  # 'p' = end of module prefetching
104  # 'M' = begin of module processing
105  # 'm' = end of module processing
106  if step == 'p' or step == 'M' or step == 'm':
107  trans = kStarted
108  if step == 'p':
109  trans = kPrefetchEnd
110  elif step == 'm':
111  trans = kFinished
112  name = moduleNames[moduleID]
113 
114  # Delayed read from source
115  # 'R' = begin of delayed read from source
116  # 'r' = end of delayed read from source
117  if step == 'R' or step == 'r':
118  trans = kStarted
119  if step == 'r':
120  trans = kFinished
121  name = kSourceDelayedRead
122 
123  if len(name) > maxNameSize:
124  maxNameSize = len(name)
125 
126  processingSteps.append((name,trans,stream,time))
127 
128  f.close()
129  return (processingSteps,numStreams,maxNameSize)
130 
131 #----------------------------------------------
132 def getTime(line):
133  time = line.split(" ")[1]
134  time = time.split(":")
135  time = int(time[0])*60*60+int(time[1])*60+float(time[2])
136  time = 1000*time # convert to milliseconds
137  return time
138 
139 #----------------------------------------------
141  processingSteps = []
142  numStreams = 0
143  maxNameSize = 0
144  startTime = 0
145  foundEventToStartFrom = False
146  for l in f:
147  if not foundEventToStartFrom:
148  if l.find("event = 5") != -1:
149  foundEventToStartFrom = True
150  stream = int( l[l.find("stream = ")+9])
151  processingSteps.append((kFinishInit,kFinished,stream,getTime(l)-startTime))
152  if l.find("processing event :") != -1:
153  time = getTime(l)
154  if startTime == 0:
155  startTime = time
156  time = time - startTime
157  streamIndex = l.find("stream = ")
158  stream = int( l[streamIndex+9:l.find(" ",streamIndex+10)])
159  name = kSourceFindEvent
160  trans = kFinished
161  #the start of an event is the end of the framework part
162  if l.find("starting:") != -1:
163  trans = kStarted
164  processingSteps.append((name,trans,stream,time))
165  if stream > numStreams:
166  numStreams = stream
167  if l.find("processing event for module") != -1:
168  time = getTime(l)
169  if startTime == 0:
170  startTime = time
171  time = time - startTime
172  trans = kStarted
173  stream = 0
174  delayed = False
175  if l.find("finished:") != -1:
176  if l.find("prefetching") != -1:
177  trans = kPrefetchEnd
178  else:
179  trans = kFinished
180  else:
181  if l.find("prefetching") != -1:
182  #skip this since we don't care about prefetch starts
183  continue
184  streamIndex = l.find("stream = ")
185  stream = int( l[streamIndex+9:l.find(" ",streamIndex+10)])
186  name = l.split("'")[1]
187  if len(name) > maxNameSize:
188  maxNameSize = len(name)
189  processingSteps.append((name,trans,stream,time))
190  if stream > numStreams:
191  numStreams = stream
192  if l.find("event delayed read from source") != -1:
193  time = getTime(l)
194  if startTime == 0:
195  startTime = time
196  time = time - startTime
197  trans = kStarted
198  stream = 0
199  delayed = False
200  if l.find("finished:") != -1:
201  trans = kFinished
202  streamIndex = l.find("stream = ")
203  stream = int( l[streamIndex+9:l.find(" ",streamIndex+10)])
204  name = kSourceDelayedRead
205  if len(name) > maxNameSize:
206  maxNameSize = len(name)
207  processingSteps.append((name,trans,stream,time))
208  if stream > numStreams:
209  numStreams = stream
210  f.close()
211  return (processingSteps,numStreams,maxNameSize)
212 
213 
214 #----------------------------------------------
215 def chooseParser(inputFile):
216  firstLine = inputFile.readline().rstrip()
217  inputFile.seek(0) # Rewind back to beginning
218 
219  if firstLine.find("# Step") != -1:
220  print "> ... Parsing StallMonitor output."
221  return parseStallMonitorOutput
222  elif firstLine.find("++") != -1:
223  global kTracerInput
224  kTracerInput = True
225  print "> ... Parsing Tracer output."
226  return parseTracerOutput
227  else:
228  inputFile.close()
229  print "Unknown input format."
230  exit(1)
231 
232 #----------------------------------------------
233 def readLogFile(fileName):
234  f = open(fileName,"r")
235  parseInput = chooseParser(f)
236  return parseInput(f)
237 
238 #----------------------------------------------
239 # Patterns:
240 #
241 # source: The source just records how long it was spent doing work,
242 # not how long it was stalled. We can get a lower bound on the stall
243 # time by measuring the time the stream was doing no work up till
244 # the source was run.
245 # modules: The time between prefetch finished and 'start processing' is
246 # the time it took to acquire any resources
247 #
248 def findStalledModules(processingSteps, numStreams):
249  streamTime = [0]*(numStreams+1)
250  stalledModules = {}
251  modulesActiveOnStream = [{} for x in xrange(0,numStreams+1)]
252  for n,trans,s,time in processingSteps:
253  waitTime = None
254  modulesOnStream = modulesActiveOnStream[s]
255  if trans == kPrefetchEnd:
256  modulesOnStream[n] = time
257  if trans == kStarted:
258  if n in modulesOnStream:
259  waitTime = time - modulesOnStream[n]
260  if n == kSourceDelayedRead:
261  if 0 == len(modulesOnStream):
262  waitTime = time - streamTime[s]
263  if trans == kFinished:
264  if n != kSourceDelayedRead and n!=kSourceFindEvent and n!=kFinishInit:
265  del modulesOnStream[n]
266  streamTime[s] = time
267  if waitTime is not None:
268  if waitTime > kStallThreshold:
269  t = stalledModules.setdefault(n,[])
270  t.append(waitTime)
271  return stalledModules
272 
273 
274 #----------------------------------------------
275 def createAsciiImage(processingSteps, numStreams, maxNameSize):
276  streamTime = [0]*(numStreams+1)
277  streamState = [0]*(numStreams+1)
278  modulesActiveOnStreams = [{} for x in xrange(0,numStreams+1)]
279  seenInit = False
280  for n,trans,s,time in processingSteps:
281  if n == kFinishInit:
282  seenInit = True
283  continue
284  modulesActiveOnStream = modulesActiveOnStreams[s]
285  waitTime = None
286  if trans == kPrefetchEnd:
287  modulesActiveOnStream[n] = time
288  continue
289  if trans == kStarted:
290  if n != kSourceFindEvent:
291  streamState[s] +=1
292  if n in modulesActiveOnStream:
293  waitTime = time - modulesActiveOnStream[n]
294  if n == kSourceDelayedRead:
295  if streamState[s] == 0:
296  waitTime = time-streamTime[s]
297  if trans == kFinished:
298  if n != kSourceDelayedRead and n!=kSourceFindEvent:
299  del modulesActiveOnStream[n]
300  if n != kSourceFindEvent:
301  streamState[s] -=1
302  streamTime[s] = time
303  states = "%-*s: " % (maxNameSize,n)
304  if trans == kStarted:
305  states +="+ "
306  if trans == kFinished:
307  states +="- "
308  for index, state in enumerate(streamState):
309  if n==kSourceFindEvent and index == s:
310  states +="* "
311  else:
312  states +=str(state)+" "
313  if waitTime is not None:
314  states += " %.2f"% (waitTime/1000.)
315  if waitTime > kStallThreshold and seenInit:
316  states += " STALLED "+str(time/1000.)+" "+str(s)
317 
318  print states
319  return stalledModules
320 
321 #----------------------------------------------
322 def printStalledModulesInOrder(stalledModules):
323  priorities = []
324  maxNameSize = 0
325  for n,t in stalledModules.iteritems():
326  nameLength = len(n)
327  if nameLength > maxNameSize:
328  maxNameSize = nameLength
329  t.sort(reverse=True)
330  priorities.append((n,sum(t),t))
331 
332  def sumSort(i,j):
333  return cmp(i[1],j[1])
334  priorities.sort(cmp=sumSort, reverse=True)
335 
336  nameColumn = "Stalled Module"
337  if len(nameColumn) > maxNameSize:
338  maxNameSize = len(nameColumn)
339 
340  stallColumn = "Tot Stall Time"
341  stallColumnLength = len(stallColumn)
342 
343  print "%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times"
344  for n,s,t in priorities:
345  paddedName = "%-*s:" % (maxNameSize,n)
346  print paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t])
347 
348 #----------------------------------------------
349 # Consolidating contiguous blocks with the same color
350 # drastically reduces the size of the pdf file.
351 def consolidateContiguousBlocks(numStreams, streamTimes, streamColors):
352  oldStreamTimes = streamTimes
353  oldStreamColors = streamColors
354 
355  streamTimes = [[] for x in xrange(numStreams+1)]
356  streamColors = [[] for x in xrange(numStreams+1)]
357 
358  for s in xrange(numStreams+1):
359  lastStartTime,lastTimeLength = oldStreamTimes[s][0]
360  lastColor = oldStreamColors[s][0]
361  for i in xrange(1, len(oldStreamTimes[s])):
362  start,length = oldStreamTimes[s][i]
363  color = oldStreamColors[s][i]
364  if color == lastColor and lastStartTime+lastTimeLength == start:
365  lastTimeLength += length
366  else:
367  streamTimes[s].append((lastStartTime,lastTimeLength))
368  streamColors[s].append(lastColor)
369  lastStartTime = start
370  lastTimeLength = length
371  lastColor = color
372  streamTimes[s].append((lastStartTime,lastTimeLength))
373  streamColors[s].append(lastColor)
374 
375  return (streamTimes,streamColors)
376 
377 #----------------------------------------------
378 def createPDFImage(pdfFile, processingSteps, numStreams, stalledModuleInfo):
379  # Need to force display since problems with CMSSW matplotlib.
380  import matplotlib
381  matplotlib.use("PDF")
382  import matplotlib.pyplot as plt
383 
384  stalledModuleNames = set([ x for x in stalledModuleInfo.iterkeys()])
385 
386  streamTimes = [[] for x in xrange(numStreams+1)]
387  streamColors = [[] for x in xrange(numStreams+1)]
388  modulesActiveOnStreams = [{} for x in xrange(0,numStreams+1)]
389  streamLastEventEndTimes = [None]*(numStreams+1)
390  streamMultipleModulesRunningTimes = [[] for x in xrange(numStreams+1)]
391  maxNumberOfConcurrentModulesOnAStream = 0
392  streamInvertedMessageFromModule = [set() for x in xrange(numStreams+1)]
393 
394  for n,trans,s,time in processingSteps:
395  if n == kFinishInit:
396  continue
397  startTime = None
398  if streamLastEventEndTimes[s] is None:
399  streamLastEventEndTimes[s]=time
400  if n == kFinishInit:
401  continue
402  if trans == kStarted:
403  if n == kSourceFindEvent:
404  # We assume the time from the end of the last event
405  # for a stream until the start of a new event for that
406  # stream is taken up by the source.
407  startTime = streamLastEventEndTimes[s]
408  moduleNames = set(n)
409  else:
410  activeModules = modulesActiveOnStreams[s]
411  moduleNames = set(activeModules.iterkeys())
412  if n in streamInvertedMessageFromModule[s] and kTracerInput:
413  # This is the rare case where a finished message
414  # is issued before the corresponding started.
415  streamInvertedMessageFromModule[s].remove(n)
416  continue
417  activeModules[n] = time
418  nModulesRunning = len(activeModules)
419  if nModulesRunning > 1:
420  streamMultipleModulesRunningTimes[s].append([nModulesRunning, time, None])
421  if nModulesRunning > maxNumberOfConcurrentModulesOnAStream:
422  maxNumberOfConcurrentModulesOnAStream = nModulesRunning
423  # Need to create a new time span to avoid overlaps in graph.
424  startTime = min(activeModules.itervalues())
425  for k in activeModules.iterkeys():
426  activeModules[k]=time
427 
428  if trans == kFinished:
429  if n == kSourceFindEvent:
430  streamLastEventEndTimes[s]=time
431  else:
432  activeModules = modulesActiveOnStreams[s]
433  if n not in activeModules and kTracerInput:
434  # This is the rare case where a finished message
435  # is issued before the corresponding started.
436  streamInvertedMessageFromModule[s].add(n)
437  continue
438  startTime = activeModules[n]
439  moduleNames = set(activeModules.iterkeys())
440  del activeModules[n]
441  nModulesRunning = len(activeModules)
442  if nModulesRunning > 0:
443  streamMultipleModulesRunningTimes[s][-1][2]=time
444  # Reset start time for remaining modules to this time
445  # to avoid overlapping time ranges when making the plot.
446  for k in activeModules.iterkeys():
447  activeModules[k] = time
448  if startTime is not None:
449  c="green"
450  if (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
451  c = "orange"
452  streamTimes[s].append((startTime,time-startTime))
453  for n in moduleNames:
454  if n in stalledModuleNames:
455  c="red"
456  break
457  streamColors[s].append(c)
458 
459 
460  (streamTimes,streamColors) = consolidateContiguousBlocks(numStreams, streamTimes, streamColors)
461 
462  fig, ax = plt.subplots()
463  ax.set_xlabel("Time (sec)")
464  ax.set_ylabel("Stream ID")
465 
466  height = 0.8/maxNumberOfConcurrentModulesOnAStream
467 
468  def scaleToSeconds(times):
469  return [(x[0]/1000.,x[1]/1000.) for x in times]
470 
471  for i,times in enumerate(streamTimes):
472  ax.broken_barh(scaleToSeconds(times),(i-0.4,height),facecolors=streamColors[i],edgecolors=streamColors[i],linewidth=0)
473 
474  # Now superimpose the number of concurrently running modules on to the graph.
475  if maxNumberOfConcurrentModulesOnAStream > 1:
476  for i,occurrences in enumerate(streamMultipleModulesRunningTimes):
477  for info in occurrences:
478  if info[2] is None:
479  continue
480  times = (info[1], info[2]-info[1])
481  ax.broken_barh(scaleToSeconds([times]), (i-0.4+height, height*(info[0]-1)), facecolors="blue",edgecolors="blue",linewidth=0)
482 
483  fig.text(0.1, 0.95, "modules running", color = "green", horizontalalignment = 'left')
484  fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
485  fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
486  fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
487  print "> ... Saving to file: '{}'".format(pdfFile)
488  plt.savefig(pdfFile)
489 
490 #=======================================
491 if __name__=="__main__":
492  import sys
493 
494  argc = len(sys.argv)
495  if argc not in [2,3]:
496  sys.stderr.write("\n\033[1mERROR:\033[0m Wrong number of arguments specified ({}). Should be 2 or 3.\n\n".format(argc))
497  printHelp()
498  exit(0)
499 
500  doGraphic = False
501  pdfFile="stall.pdf"
502  if argc == 3:
503  arg = sys.argv[1]
504  if arg == '-g':
505  doGraphic = True
506  elif arg.find("-g=") != -1:
507  doGraphic = True
508  pdfFile = arg.split('=')[1]
509  if not re.match(r'^[\w\.]+$', pdfFile):
510  print "Malformed file name '{}' supplied with the '-g' option.".format(pdfFile)
511  print "Only characters 0-9, a-z, A-Z, '_', and '.' are allowed."
512  exit(1)
513  else:
514  print "Unknown argument ",sys.argv[1]
515  exit(1)
516 
517  fileName =sys.argv[-1]
518 
519  sys.stderr.write( ">reading file\n" )
520  processingSteps,numStreams,maxNameSize = readLogFile(sys.argv[-1])
521  sys.stderr.write(">processing data\n")
522  stalledModules = findStalledModules(processingSteps, numStreams)
523  if not doGraphic:
524  sys.stderr.write(">preparing ASCII art\n")
525  createAsciiImage(processingSteps, numStreams, maxNameSize)
526  else:
527  sys.stderr.write(">creating PDF\n")
528  createPDFImage(pdfFile, processingSteps, numStreams, stalledModules)
529  printStalledModulesInOrder(stalledModules)
def parseInput
Definition: tools.py:122
boost::dynamic_bitset append(const boost::dynamic_bitset<> &bs1, const boost::dynamic_bitset<> &bs2)
this method takes two bitsets bs1 and bs2 and returns result of bs2 appended to the end of bs1 ...
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
T min(T a, T b)
Definition: MathUtil.h:58
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
double split
Definition: MVATrainer.cc:139