CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
cmsPerfClient.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import socket, xml, xmlrpclib, os, sys, threading, Queue, time, random, pickle, exceptions
3 import optparse as opt
4 #Documentation needs to follow... but for now just know that
5 #a template file for cmsPerfClient.py -f option is BencmarkCfg.py in Validation/Performance/python dir.
6 PROG_NAME = os.path.basename(sys.argv[0])
7 # list of valid options for the configuration file
8 validPerfSuitKeys= ["castordir", "perfsuitedir" ,"TimeSizeEvents", "TimeSizeCandles","IgProfEvents", "IgProfCandles", "CallgrindEvents", "CallgrindCandles", "MemcheckEvents","MemcheckCandles","cmsScimark", "cmsScimarkLarge",
9  "cmsdriverOptions", "stepOptions", "quicktest", "profilers", "cpus", "cores", "prevrel", "isAllCandles", "candles",
10  "bypasshlt", "runonspare", "logfile"]
11 #################
12 #
13 # Option parser
14 # returns : Command set to run on each (or all) machines, port to connect to server,
15 # List of machines to connect to, File to pickle results to,
16 # Dictionary to index which command set to use for which machine
18 
19  #########################
20  # Config file type validator
21  # Checks type of configuration options in the config file
22  #
23  def _isValidPerfCmdsDef(alist):
24  out = True
25  for item in alist:
26  isdict = type(item) == type({})
27  out = out and isdict
28  if isdict:
29  for key in item:
30  out = out and key in validPerfSuitKeys
31  if key == "cpus":
32  out = out and type(item[key]) == type("") #has to be a string not a list!
33  elif key == "cores":
34  out = out and type(item[key]) == type("")
35  elif key == "castordir":
36  out = out and type(item[key]) == type("")
37  elif key == "perfsuitedir":
38  out = out and type(item[key]) == type("")
39  elif key == "TimeSizeEvents":
40  out = out and type(item[key]) == type(123)
41  elif key == "TimeSizeCandles":
42  out = out and type(item[key]) == type("")
43  elif key == "CallgrindEvents":
44  out = out and type(item[key]) == type(123)
45  elif key == "CallgrindCandles":
46  out = out and type(item[key]) == type("")
47  elif key == "IgProfEvents":
48  out = out and type(item[key]) == type(123)
49  elif key == "IgProfCandles":
50  out = out and type(item[key]) == type("")
51  elif key == "MemcheckEvents":
52  out = out and type(item[key]) == type(123)
53  elif key == "MemcheckCandles":
54  out = out and type(item[key]) == type("")
55  elif key == "cmsScimark":
56  out = out and type(item[key]) == type(123)
57  elif key == "cmsScimarkLarge":
58  out = out and type(item[key]) == type(123)
59  elif key == "cmsdriverOptions":
60  out = out and type(item[key]) == type("")
61  elif key == "stepOptions":
62  out = out and type(item[key]) == type("")
63  elif key == "quicktest":
64  out = out and type(item[key]) == type(False)
65  elif key == "profilers":
66  out = out and type(item[key]) == type("")
67  elif key == "prevrel":
68  out = out and type(item[key]) == type("")
69  elif key == "isAllCandles":
70  out = out and type(item[key]) == type(False)
71  elif key == "candles":
72  out = out and type(item[key]) == type("")#has to be a string not a list!
73  elif key == "bypasshlt":
74  out = out and type(item[key]) == type(False)
75  elif key == "runonspare":
76  out = out and type(item[key]) == type(False)
77  elif key == "logfile":
78  out = out and type(item[key]) == type("")
79  return out
80 
81  parser = opt.OptionParser(usage=("""%s [Options]""" % PROG_NAME))
82 
83  parser.add_option('-p',
84  '--port',
85  type="int",
86  dest='port',
87  default=-1,
88  help='Connect to server on a particular port',
89  metavar='<PORT>',
90  )
91 
92  parser.add_option('-o',
93  '--output',
94  type="string",
95  dest='outfile',
96  default="",
97  help='File to output data to',
98  metavar='<FILE>',
99  )
100 
101  parser.add_option('-m',
102  '--machines',
103  type="string",
104  action="append",
105  dest='machines',
106  default=[],
107  help='Machines to run the benchmarking on, for each machine add another one of these options',
108  metavar='<MACHINES>',
109  )
110 
111  parser.add_option('-f',
112  '--cmd-file',
113  type="string",
114  dest='cmscmdfile',
115  action="append",
116  default=[],
117  help='A files of cmsPerfSuite.py commands to execute on the machines, if more than one of these options is passed and the number of these options is the same as the number of machines, the x-th machine will use the x-th config file.',
118  metavar='<PATH>',
119  )
120 
121  (options, args) = parser.parse_args()
122 
123  ######################
124  # Check output file location
125  #
126  outfile = options.outfile
127  if not outfile == "":
128  outfile = os.path.abspath(options.outfile)
129  outdir = os.path.dirname(outfile)
130  if not os.path.isdir(outdir):
131  parser.error("ERROR: %s is not a valid directory to create %s" % (outdir,os.path.basename(outfile)))
132  sys.exit()
133  else:
134  outfile = os.path.join(os.getcwd(),"cmsmultiperfdata.pypickle")
135 
136  if os.path.exists(outfile):
137  parser.error("ERROR: outfile %s already exists" % outfile)
138  sys.exit()
139 
140 
141  ###############
142  # Check configuration files for errors
143  #
144  cmsperf_cmds = []
145  cmscmdfiles = options.cmscmdfile
146  if len(cmscmdfiles) <= 0:
147  parser.error("A valid python file defining a list of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program")
148  sys.exit()
149  else:
150  for cmscmdfile in cmscmdfiles:
151  cmdfile = os.path.abspath(cmscmdfile)
152  print cmdfile
153  if os.path.isfile(cmdfile):
154  try:
155  execfile(cmdfile)
156  cmsperf_cmds.append(listperfsuitekeywords)
157  except (SyntaxError), detail:
158  parser.error("ERROR: %s must be a valid python file" % cmdfile)
159  sys.exit()
160  except (NameError), detail:
161  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program: %s" % (cmdfile,str(detail)))
162  sys.exit()
163  except :
164  raise
165  if not type(cmsperf_cmds[-1]) == type([]):
166  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 2" % cmdfile)
167  sys.exit()
168  if not _isValidPerfCmdsDef(cmsperf_cmds[-1]):
169  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 3" % cmdfile)
170  sys.exit()
171 
172  else:
173  parser.error("ERROR: %s is not a file" % cmdfile)
174  sys.exit()
175 
176  ########
177  # Setup port number
178  #
179  port = 0
180  if options.port == -1:
181  port = 8000
182  else:
183  port = options.port
184 
185  machines = options.machines
186 
187  #################
188  # Check machine hostnames
189  #
190  if len(machines) <= 0:
191  parser.error("you must specify at least one machine to benchmark")
192  else:
193  machines = map(lambda x: x.strip(),machines)
194 
195  for machine in machines:
196  try:
197  output = socket.getaddrinfo(machine,port)
198  except socket.gaierror:
199  parser.error("ERROR: Can not resolve machine address %s (must be ip{4,6} or hostname)" % machine)
200  sys.exit()
201 
202  ##############
203  # Define which configuration file to use for which machine
204  # If only one configuration file is used then it used for all machines
205  cmdindex = {} # define an index that defines the commands to be run for each machine to be perfsuite'd
206  if len(cmsperf_cmds) == 1:
207  for machine in machines:
208  # each value is the index in cmsperf_cmds that the machine will run
209  # in this case all machines run the same set of commands
210  cmdindex[machine] = 0
211  else:
212  if not len(cmsperf_cmds) == len(machines):
213  parser.error("if more than one configuration file was specified you must specify a configuration file for each machine.")
214  sys.exit()
215 
216  for i in range(len(machines)):
217  # each value is the index in cmsperf_cmds that the machine will run
218  # in this case each machine runs the i-th configuration file passed as an option
219  cmdindex[machine] = i
220 
221  return (cmsperf_cmds, port, machines, outfile, cmdindex)
222 
223 #################
224 # Request benchmark
225 # Connects to server and returns data
226 # returns: profiling data from server
227 #
228 def request_benchmark(perfcmds,shost,sport):
229  try:
230  server = xmlrpclib.ServerProxy("http://%s:%s" % (shost,sport))
231  return server.request_benchmark(perfcmds)
232  except socket.error, detail:
233  print "ERROR: Could not communicate with server %s:%s:" % (shost,sport), detail
234  except xml.parsers.expat.ExpatError, detail:
235  print "ERROR: XML-RPC could not be parsed:", detail
236  except xmlrpclib.ProtocolError, detail:
237  print "ERROR: XML-RPC protocol error", detail, "try using -L xxx:localhost:xxx if using ssh to forward"
238  except exceptions, detail:
239  print "ERROR: There was a runtime error thrown by server %s; detail follows." % shost
240  print detail
241 
242 #################
243 # Worker
244 # This is a subclass of thread that submits commands to the server and stores the result in a thread-safe queue
245 #
246 class Worker(threading.Thread):
247 
248  def __init__(self, host, port, perfcmds, queue):
249  self.__perfcmds = perfcmds
250  self.__host = host
251  self.__port = port
252  self.__queue = queue
253  threading.Thread.__init__(self)
254 
255  def run(self):
256  try:
257  data = request_benchmark(self.__perfcmds, self.__host, self.__port)
258  #Debugging
259  print "data is %s"%data
260  print "Puttin it in the queue as (%s,%s)"%(self.__host,data)
261  self.__queue.put((self.__host, data))
262  except (exceptions.Exception, xmlrpclib.Fault), detail:
263  print "Exception was thrown when receiving/submitting job information to host", self.__host, ". Exception information:"
264  print detail
265  sys.stdout.flush()
266 
267 ##########################
268 # runclient
269 # Creates a thread for each machine to profile and waits for all machines to return data (you might consider adding a timeout in the while loop)
270 # If the client is killed for some reason or there is an exception, dump the data to a file before throwing the exception
271 def runclient(perfcmds, hosts, port, outfile, cmdindex):
272  queue = Queue.Queue()
273  # start all threads
274  workers = []
275  for host in hosts:
276  print "Submitting jobs to %s..." % host
277  w = Worker(host, port, perfcmds[cmdindex[host]], queue)
278  w.start()
279  workers.append(w)
280  print "All jobs submitted, waiting for results..."
281  sys.stdout.flush()
282  # run until all servers have returned data
283  while reduce(lambda x,y: x or y, map(lambda x: x.isAlive(),workers)):
284  try:
285  time.sleep(2.0)
286  sys.stdout.flush()
287  except (KeyboardInterrupt, SystemExit):
288  #cleanup
289  presentBenchmarkData(queue,outfile)
290  raise
291  except:
292  #cleanup
293  presentBenchmarkData(queue,outfile)
294  raise
295  print "All job results received"
296  print "The size with the queue containing all data is: %s "%queue.qsize()
297  presentBenchmarkData(queue,outfile)
298 
299 ########################################
300 #
301 # Format of the returned data from remote host should be of the form (this could be cleaned up a little bit)
302 #
303 # list of command outputs [ dictionary of cpus { } ]
304 #
305 # For example:
306 # returned data = [ cmd_output1, cmd_output2 ... ]
307 # cmd_output1 = { cpuid1 : cpu_output1, cpuid2 : cpu_output2 ... } # cpuid is "None" if there was only one cpu used
308 # cpu_output1 = { candle1 : profset_output1, candle2 : profset_output2 ... }
309 # profset_output1 = { profset1 : profile_output1, ... }
310 # profile_output1 = { profiletype1: step_output1, ... }
311 # step_output1 = { step1: list_of_cpu_times, ... }
312 # list_of_cpu_times = [ (evt_num1, secs1), ... ]
313 
314 ###########
315 #
316 # We now massage the data
317 #
318 def presentBenchmarkData(q,outfile):
319  print "Pickling data to file %s"%outfile
320  out = [] # match up the commands with each
321  # command that was passed in the config file
322  while not q.empty():
323  print "Queue size is still %s"%q.qsize()
324  (host, data) = q.get()
325  out.append((host,data))
326  print "Dumping at screen the output!\n%s"%out
327  oh = open(outfile,"wb")
328  pickle.dump(out,oh)
329  oh.close()
330 
331 def _main():
332  (cmsperf_cmds, port, hosts, outfile, cmdindex) = optionparse()
333  runclient(cmsperf_cmds, hosts, port, outfile, cmdindex)
334 
335 if __name__ == "__main__":
336  _main()
Worker This is a subclass of thread that submits commands to the server and stores the result in a th...
def optionparse
Option parser returns : Command set to run on each (or all) machines, port to connect to server...
def presentBenchmarkData
Format of the returned data from remote host should be of the form (this could be cleaned up a little...
def request_benchmark
Request benchmark Connects to server and returns data returns: profiling data from server...