CMS 3D CMS Logo

cmsPerfClient.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import socket, xml, xmlrpclib, os, sys, threading, Queue, time, random, pickle, exceptions
3 import optparse as opt
4 from functools import reduce
5 #Documentation needs to follow... but for now just know that
6 #a template file for cmsPerfClient.py -f option is BencmarkCfg.py in Validation/Performance/python dir.
7 PROG_NAME = os.path.basename(sys.argv[0])
8 # list of valid options for the configuration file
9 validPerfSuitKeys= ["castordir", "perfsuitedir" ,"TimeSizeEvents", "TimeSizeCandles","IgProfEvents", "IgProfCandles", "CallgrindEvents", "CallgrindCandles", "MemcheckEvents","MemcheckCandles","cmsScimark", "cmsScimarkLarge",
10  "cmsdriverOptions", "stepOptions", "quicktest", "profilers", "cpus", "cores", "prevrel", "isAllCandles", "candles",
11  "bypasshlt", "runonspare", "logfile"]
12 #################
13 #
14 # Option parser
15 # returns : Command set to run on each (or all) machines, port to connect to server,
16 # List of machines to connect to, File to pickle results to,
17 # Dictionary to index which command set to use for which machine
19 
20  #########################
21  # Config file type validator
22  # Checks type of configuration options in the config file
23  #
24  def _isValidPerfCmdsDef(alist):
25  out = True
26  for item in alist:
27  isdict = isinstance(item, type({}))
28  out = out and isdict
29  if isdict:
30  for key in item:
31  out = out and key in validPerfSuitKeys
32  if key == "cpus":
33  out = out and isinstance(item[key], type("")) #has to be a string not a list!
34  elif key == "cores":
35  out = out and isinstance(item[key], type(""))
36  elif key == "castordir":
37  out = out and isinstance(item[key], type(""))
38  elif key == "perfsuitedir":
39  out = out and isinstance(item[key], type(""))
40  elif key == "TimeSizeEvents":
41  out = out and isinstance(item[key], type(123))
42  elif key == "TimeSizeCandles":
43  out = out and isinstance(item[key], type(""))
44  elif key == "CallgrindEvents":
45  out = out and isinstance(item[key], type(123))
46  elif key == "CallgrindCandles":
47  out = out and isinstance(item[key], type(""))
48  elif key == "IgProfEvents":
49  out = out and isinstance(item[key], type(123))
50  elif key == "IgProfCandles":
51  out = out and isinstance(item[key], type(""))
52  elif key == "MemcheckEvents":
53  out = out and isinstance(item[key], type(123))
54  elif key == "MemcheckCandles":
55  out = out and isinstance(item[key], type(""))
56  elif key == "cmsScimark":
57  out = out and isinstance(item[key], type(123))
58  elif key == "cmsScimarkLarge":
59  out = out and isinstance(item[key], type(123))
60  elif key == "cmsdriverOptions":
61  out = out and isinstance(item[key], type(""))
62  elif key == "stepOptions":
63  out = out and isinstance(item[key], type(""))
64  elif key == "quicktest":
65  out = out and isinstance(item[key], type(False))
66  elif key == "profilers":
67  out = out and isinstance(item[key], type(""))
68  elif key == "prevrel":
69  out = out and isinstance(item[key], type(""))
70  elif key == "isAllCandles":
71  out = out and isinstance(item[key], type(False))
72  elif key == "candles":
73  out = out and isinstance(item[key], type(""))#has to be a string not a list!
74  elif key == "bypasshlt":
75  out = out and isinstance(item[key], type(False))
76  elif key == "runonspare":
77  out = out and isinstance(item[key], type(False))
78  elif key == "logfile":
79  out = out and isinstance(item[key], type(""))
80  return out
81 
82  parser = opt.OptionParser(usage=("""%s [Options]""" % PROG_NAME))
83 
84  parser.add_option('-p',
85  '--port',
86  type="int",
87  dest='port',
88  default=-1,
89  help='Connect to server on a particular port',
90  metavar='<PORT>',
91  )
92 
93  parser.add_option('-o',
94  '--output',
95  type="string",
96  dest='outfile',
97  default="",
98  help='File to output data to',
99  metavar='<FILE>',
100  )
101 
102  parser.add_option('-m',
103  '--machines',
104  type="string",
105  action="append",
106  dest='machines',
107  default=[],
108  help='Machines to run the benchmarking on, for each machine add another one of these options',
109  metavar='<MACHINES>',
110  )
111 
112  parser.add_option('-f',
113  '--cmd-file',
114  type="string",
115  dest='cmscmdfile',
116  action="append",
117  default=[],
118  help='A files of cmsPerfSuite.py commands to execute on the machines, if more than one of these options is passed and the number of these options is the same as the number of machines, the x-th machine will use the x-th config file.',
119  metavar='<PATH>',
120  )
121 
122  (options, args) = parser.parse_args()
123 
124  ######################
125  # Check output file location
126  #
127  outfile = options.outfile
128  if not outfile == "":
129  outfile = os.path.abspath(options.outfile)
130  outdir = os.path.dirname(outfile)
131  if not os.path.isdir(outdir):
132  parser.error("ERROR: %s is not a valid directory to create %s" % (outdir,os.path.basename(outfile)))
133  sys.exit()
134  else:
135  outfile = os.path.join(os.getcwd(),"cmsmultiperfdata.pypickle")
136 
137  if os.path.exists(outfile):
138  parser.error("ERROR: outfile %s already exists" % outfile)
139  sys.exit()
140 
141 
142  ###############
143  # Check configuration files for errors
144  #
145  cmsperf_cmds = []
146  cmscmdfiles = options.cmscmdfile
147  if len(cmscmdfiles) <= 0:
148  parser.error("A valid python file defining a list of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program")
149  sys.exit()
150  else:
151  for cmscmdfile in cmscmdfiles:
152  cmdfile = os.path.abspath(cmscmdfile)
153  print cmdfile
154  if os.path.isfile(cmdfile):
155  try:
156  execfile(cmdfile)
157  cmsperf_cmds.append(listperfsuitekeywords)
158  except (SyntaxError) as detail:
159  parser.error("ERROR: %s must be a valid python file" % cmdfile)
160  sys.exit()
161  except (NameError) as detail:
162  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program: %s" % (cmdfile,str(detail)))
163  sys.exit()
164  except :
165  raise
166  if not isinstance(cmsperf_cmds[-1], type([])):
167  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 2" % cmdfile)
168  sys.exit()
169  if not _isValidPerfCmdsDef(cmsperf_cmds[-1]):
170  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 3" % cmdfile)
171  sys.exit()
172 
173  else:
174  parser.error("ERROR: %s is not a file" % cmdfile)
175  sys.exit()
176 
177  ########
178  # Setup port number
179  #
180  port = 0
181  if options.port == -1:
182  port = 8000
183  else:
184  port = options.port
185 
186  machines = options.machines
187 
188  #################
189  # Check machine hostnames
190  #
191  if len(machines) <= 0:
192  parser.error("you must specify at least one machine to benchmark")
193  else:
194  machines = map(lambda x: x.strip(),machines)
195 
196  for machine in machines:
197  try:
198  output = socket.getaddrinfo(machine,port)
199  except socket.gaierror:
200  parser.error("ERROR: Can not resolve machine address %s (must be ip{4,6} or hostname)" % machine)
201  sys.exit()
202 
203  ##############
204  # Define which configuration file to use for which machine
205  # If only one configuration file is used then it used for all machines
206  cmdindex = {} # define an index that defines the commands to be run for each machine to be perfsuite'd
207  if len(cmsperf_cmds) == 1:
208  for machine in machines:
209  # each value is the index in cmsperf_cmds that the machine will run
210  # in this case all machines run the same set of commands
211  cmdindex[machine] = 0
212  else:
213  if not len(cmsperf_cmds) == len(machines):
214  parser.error("if more than one configuration file was specified you must specify a configuration file for each machine.")
215  sys.exit()
216 
217  for i in range(len(machines)):
218  # each value is the index in cmsperf_cmds that the machine will run
219  # in this case each machine runs the i-th configuration file passed as an option
220  cmdindex[machine] = i
221 
222  return (cmsperf_cmds, port, machines, outfile, cmdindex)
223 
224 #################
225 # Request benchmark
226 # Connects to server and returns data
227 # returns: profiling data from server
228 #
229 def request_benchmark(perfcmds,shost,sport):
230  try:
231  server = xmlrpclib.ServerProxy("http://%s:%s" % (shost,sport))
232  return server.request_benchmark(perfcmds)
233  except socket.error as detail:
234  print "ERROR: Could not communicate with server %s:%s:" % (shost,sport), detail
235  except xml.parsers.expat.ExpatError as detail:
236  print "ERROR: XML-RPC could not be parsed:", detail
237  except xmlrpclib.ProtocolError as detail:
238  print "ERROR: XML-RPC protocol error", detail, "try using -L xxx:localhost:xxx if using ssh to forward"
239  except exceptions as detail:
240  print "ERROR: There was a runtime error thrown by server %s; detail follows." % shost
241  print detail
242 
243 #################
244 # Worker
245 # This is a subclass of thread that submits commands to the server and stores the result in a thread-safe queue
246 #
247 class Worker(threading.Thread):
248 
249  def __init__(self, host, port, perfcmds, queue):
250  self.__perfcmds = perfcmds
251  self.__host = host
252  self.__port = port
253  self.__queue = queue
254  threading.Thread.__init__(self)
255 
256  def run(self):
257  try:
258  data = request_benchmark(self.__perfcmds, self.__host, self.__port)
259  #Debugging
260  print "data is %s"%data
261  print "Puttin it in the queue as (%s,%s)"%(self.__host,data)
262  self.__queue.put((self.__host, data))
263  except (exceptions.Exception, xmlrpclib.Fault) as detail:
264  print "Exception was thrown when receiving/submitting job information to host", self.__host, ". Exception information:"
265  print detail
266  sys.stdout.flush()
267 
268 ##########################
269 # runclient
270 # Creates a thread for each machine to profile and waits for all machines to return data (you might consider adding a timeout in the while loop)
271 # If the client is killed for some reason or there is an exception, dump the data to a file before throwing the exception
272 def runclient(perfcmds, hosts, port, outfile, cmdindex):
273  queue = Queue.Queue()
274  # start all threads
275  workers = []
276  for host in hosts:
277  print "Submitting jobs to %s..." % host
278  w = Worker(host, port, perfcmds[cmdindex[host]], queue)
279  w.start()
280  workers.append(w)
281  print "All jobs submitted, waiting for results..."
282  sys.stdout.flush()
283  # run until all servers have returned data
284  while reduce(lambda x,y: x or y, map(lambda x: x.isAlive(),workers)):
285  try:
286  time.sleep(2.0)
287  sys.stdout.flush()
288  except (KeyboardInterrupt, SystemExit):
289  #cleanup
290  presentBenchmarkData(queue,outfile)
291  raise
292  except:
293  #cleanup
294  presentBenchmarkData(queue,outfile)
295  raise
296  print "All job results received"
297  print "The size with the queue containing all data is: %s "%queue.qsize()
298  presentBenchmarkData(queue,outfile)
299 
300 ########################################
301 #
302 # Format of the returned data from remote host should be of the form (this could be cleaned up a little bit)
303 #
304 # list of command outputs [ dictionary of cpus { } ]
305 #
306 # For example:
307 # returned data = [ cmd_output1, cmd_output2 ... ]
308 # cmd_output1 = { cpuid1 : cpu_output1, cpuid2 : cpu_output2 ... } # cpuid is "None" if there was only one cpu used
309 # cpu_output1 = { candle1 : profset_output1, candle2 : profset_output2 ... }
310 # profset_output1 = { profset1 : profile_output1, ... }
311 # profile_output1 = { profiletype1: step_output1, ... }
312 # step_output1 = { step1: list_of_cpu_times, ... }
313 # list_of_cpu_times = [ (evt_num1, secs1), ... ]
314 
315 ###########
316 #
317 # We now massage the data
318 #
319 def presentBenchmarkData(q,outfile):
320  print "Pickling data to file %s"%outfile
321  out = [] # match up the commands with each
322  # command that was passed in the config file
323  while not q.empty():
324  print "Queue size is still %s"%q.qsize()
325  (host, data) = q.get()
326  out.append((host,data))
327  print "Dumping at screen the output!\n%s"%out
328  oh = open(outfile,"wb")
329  pickle.dump(out,oh)
330  oh.close()
331 
332 def _main():
333  (cmsperf_cmds, port, hosts, outfile, cmdindex) = optionparse()
334  runclient(cmsperf_cmds, hosts, port, outfile, cmdindex)
335 
336 if __name__ == "__main__":
337  _main()
def optionparse()
Option parser returns : Command set to run on each (or all) machines, port to connect to server...
Worker This is a subclass of thread that submits commands to the server and stores the result in a th...
def presentBenchmarkData(q, outfile)
Format of the returned data from remote host should be of the form (this could be cleaned up a little...
def __init__(self, host, port, perfcmds, queue)
def runclient(perfcmds, hosts, port, outfile, cmdindex)
def request_benchmark(perfcmds, shost, sport)
Request benchmark Connects to server and returns data returns: profiling data from server...
#define str(s)