CMS 3D CMS Logo

cmsPerfClient.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import socket, xml, xmlrpclib, os, sys, threading, Queue, time, random, pickle, exceptions
4 import optparse as opt
5 from functools import reduce
6 #Documentation needs to follow... but for now just know that
7 #a template file for cmsPerfClient.py -f option is BencmarkCfg.py in Validation/Performance/python dir.
8 PROG_NAME = os.path.basename(sys.argv[0])
9 # list of valid options for the configuration file
10 validPerfSuitKeys= ["castordir", "perfsuitedir" ,"TimeSizeEvents", "TimeSizeCandles","IgProfEvents", "IgProfCandles", "CallgrindEvents", "CallgrindCandles", "MemcheckEvents","MemcheckCandles","cmsScimark", "cmsScimarkLarge",
11  "cmsdriverOptions", "stepOptions", "quicktest", "profilers", "cpus", "cores", "prevrel", "isAllCandles", "candles",
12  "bypasshlt", "runonspare", "logfile"]
13 #################
14 #
15 # Option parser
16 # returns : Command set to run on each (or all) machines, port to connect to server,
17 # List of machines to connect to, File to pickle results to,
18 # Dictionary to index which command set to use for which machine
20 
21  #########################
22  # Config file type validator
23  # Checks type of configuration options in the config file
24  #
25  def _isValidPerfCmdsDef(alist):
26  out = True
27  for item in alist:
28  isdict = isinstance(item, type({}))
29  out = out and isdict
30  if isdict:
31  for key in item:
32  out = out and key in validPerfSuitKeys
33  if key == "cpus":
34  out = out and isinstance(item[key], type("")) #has to be a string not a list!
35  elif key == "cores":
36  out = out and isinstance(item[key], type(""))
37  elif key == "castordir":
38  out = out and isinstance(item[key], type(""))
39  elif key == "perfsuitedir":
40  out = out and isinstance(item[key], type(""))
41  elif key == "TimeSizeEvents":
42  out = out and isinstance(item[key], type(123))
43  elif key == "TimeSizeCandles":
44  out = out and isinstance(item[key], type(""))
45  elif key == "CallgrindEvents":
46  out = out and isinstance(item[key], type(123))
47  elif key == "CallgrindCandles":
48  out = out and isinstance(item[key], type(""))
49  elif key == "IgProfEvents":
50  out = out and isinstance(item[key], type(123))
51  elif key == "IgProfCandles":
52  out = out and isinstance(item[key], type(""))
53  elif key == "MemcheckEvents":
54  out = out and isinstance(item[key], type(123))
55  elif key == "MemcheckCandles":
56  out = out and isinstance(item[key], type(""))
57  elif key == "cmsScimark":
58  out = out and isinstance(item[key], type(123))
59  elif key == "cmsScimarkLarge":
60  out = out and isinstance(item[key], type(123))
61  elif key == "cmsdriverOptions":
62  out = out and isinstance(item[key], type(""))
63  elif key == "stepOptions":
64  out = out and isinstance(item[key], type(""))
65  elif key == "quicktest":
66  out = out and isinstance(item[key], type(False))
67  elif key == "profilers":
68  out = out and isinstance(item[key], type(""))
69  elif key == "prevrel":
70  out = out and isinstance(item[key], type(""))
71  elif key == "isAllCandles":
72  out = out and isinstance(item[key], type(False))
73  elif key == "candles":
74  out = out and isinstance(item[key], type(""))#has to be a string not a list!
75  elif key == "bypasshlt":
76  out = out and isinstance(item[key], type(False))
77  elif key == "runonspare":
78  out = out and isinstance(item[key], type(False))
79  elif key == "logfile":
80  out = out and isinstance(item[key], type(""))
81  return out
82 
83  parser = opt.OptionParser(usage=("""%s [Options]""" % PROG_NAME))
84 
85  parser.add_option('-p',
86  '--port',
87  type="int",
88  dest='port',
89  default=-1,
90  help='Connect to server on a particular port',
91  metavar='<PORT>',
92  )
93 
94  parser.add_option('-o',
95  '--output',
96  type="string",
97  dest='outfile',
98  default="",
99  help='File to output data to',
100  metavar='<FILE>',
101  )
102 
103  parser.add_option('-m',
104  '--machines',
105  type="string",
106  action="append",
107  dest='machines',
108  default=[],
109  help='Machines to run the benchmarking on, for each machine add another one of these options',
110  metavar='<MACHINES>',
111  )
112 
113  parser.add_option('-f',
114  '--cmd-file',
115  type="string",
116  dest='cmscmdfile',
117  action="append",
118  default=[],
119  help='A files of cmsPerfSuite.py commands to execute on the machines, if more than one of these options is passed and the number of these options is the same as the number of machines, the x-th machine will use the x-th config file.',
120  metavar='<PATH>',
121  )
122 
123  (options, args) = parser.parse_args()
124 
125  ######################
126  # Check output file location
127  #
128  outfile = options.outfile
129  if not outfile == "":
130  outfile = os.path.abspath(options.outfile)
131  outdir = os.path.dirname(outfile)
132  if not os.path.isdir(outdir):
133  parser.error("ERROR: %s is not a valid directory to create %s" % (outdir,os.path.basename(outfile)))
134  sys.exit()
135  else:
136  outfile = os.path.join(os.getcwd(),"cmsmultiperfdata.pypickle")
137 
138  if os.path.exists(outfile):
139  parser.error("ERROR: outfile %s already exists" % outfile)
140  sys.exit()
141 
142 
143  ###############
144  # Check configuration files for errors
145  #
146  cmsperf_cmds = []
147  cmscmdfiles = options.cmscmdfile
148  if len(cmscmdfiles) <= 0:
149  parser.error("A valid python file defining a list of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program")
150  sys.exit()
151  else:
152  for cmscmdfile in cmscmdfiles:
153  cmdfile = os.path.abspath(cmscmdfile)
154  print(cmdfile)
155  if os.path.isfile(cmdfile):
156  try:
157  execfile(cmdfile)
158  cmsperf_cmds.append(listperfsuitekeywords)
159  except (SyntaxError) as detail:
160  parser.error("ERROR: %s must be a valid python file" % cmdfile)
161  sys.exit()
162  except (NameError) as detail:
163  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program: %s" % (cmdfile,str(detail)))
164  sys.exit()
165  except :
166  raise
167  if not isinstance(cmsperf_cmds[-1], type([])):
168  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 2" % cmdfile)
169  sys.exit()
170  if not _isValidPerfCmdsDef(cmsperf_cmds[-1]):
171  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 3" % cmdfile)
172  sys.exit()
173 
174  else:
175  parser.error("ERROR: %s is not a file" % cmdfile)
176  sys.exit()
177 
178  ########
179  # Setup port number
180  #
181  port = 0
182  if options.port == -1:
183  port = 8000
184  else:
185  port = options.port
186 
187  machines = options.machines
188 
189  #################
190  # Check machine hostnames
191  #
192  if len(machines) <= 0:
193  parser.error("you must specify at least one machine to benchmark")
194  else:
195  machines = map(lambda x: x.strip(),machines)
196 
197  for machine in machines:
198  try:
199  output = socket.getaddrinfo(machine,port)
200  except socket.gaierror:
201  parser.error("ERROR: Can not resolve machine address %s (must be ip{4,6} or hostname)" % machine)
202  sys.exit()
203 
204  ##############
205  # Define which configuration file to use for which machine
206  # If only one configuration file is used then it used for all machines
207  cmdindex = {} # define an index that defines the commands to be run for each machine to be perfsuite'd
208  if len(cmsperf_cmds) == 1:
209  for machine in machines:
210  # each value is the index in cmsperf_cmds that the machine will run
211  # in this case all machines run the same set of commands
212  cmdindex[machine] = 0
213  else:
214  if not len(cmsperf_cmds) == len(machines):
215  parser.error("if more than one configuration file was specified you must specify a configuration file for each machine.")
216  sys.exit()
217 
218  for i in range(len(machines)):
219  # each value is the index in cmsperf_cmds that the machine will run
220  # in this case each machine runs the i-th configuration file passed as an option
221  cmdindex[machine] = i
222 
223  return (cmsperf_cmds, port, machines, outfile, cmdindex)
224 
225 #################
226 # Request benchmark
227 # Connects to server and returns data
228 # returns: profiling data from server
229 #
230 def request_benchmark(perfcmds,shost,sport):
231  try:
232  server = xmlrpclib.ServerProxy("http://%s:%s" % (shost,sport))
233  return server.request_benchmark(perfcmds)
234  except socket.error as detail:
235  print("ERROR: Could not communicate with server %s:%s:" % (shost,sport), detail)
236  except xml.parsers.expat.ExpatError as detail:
237  print("ERROR: XML-RPC could not be parsed:", detail)
238  except xmlrpclib.ProtocolError as detail:
239  print("ERROR: XML-RPC protocol error", detail, "try using -L xxx:localhost:xxx if using ssh to forward")
240  except exceptions as detail:
241  print("ERROR: There was a runtime error thrown by server %s; detail follows." % shost)
242  print(detail)
243 
244 #################
245 # Worker
246 # This is a subclass of thread that submits commands to the server and stores the result in a thread-safe queue
247 #
248 class Worker(threading.Thread):
249 
250  def __init__(self, host, port, perfcmds, queue):
251  self.__perfcmds = perfcmds
252  self.__host = host
253  self.__port = port
254  self.__queue = queue
255  threading.Thread.__init__(self)
256 
257  def run(self):
258  try:
259  data = request_benchmark(self.__perfcmds, self.__host, self.__port)
260  #Debugging
261  print("data is %s"%data)
262  print("Puttin it in the queue as (%s,%s)"%(self.__host,data))
263  self.__queue.put((self.__host, data))
264  except (exceptions.Exception, xmlrpclib.Fault) as detail:
265  print("Exception was thrown when receiving/submitting job information to host", self.__host, ". Exception information:")
266  print(detail)
267  sys.stdout.flush()
268 
269 ##########################
270 # runclient
271 # Creates a thread for each machine to profile and waits for all machines to return data (you might consider adding a timeout in the while loop)
272 # If the client is killed for some reason or there is an exception, dump the data to a file before throwing the exception
273 def runclient(perfcmds, hosts, port, outfile, cmdindex):
274  queue = Queue.Queue()
275  # start all threads
276  workers = []
277  for host in hosts:
278  print("Submitting jobs to %s..." % host)
279  w = Worker(host, port, perfcmds[cmdindex[host]], queue)
280  w.start()
281  workers.append(w)
282  print("All jobs submitted, waiting for results...")
283  sys.stdout.flush()
284  # run until all servers have returned data
285  while reduce(lambda x,y: x or y, map(lambda x: x.isAlive(),workers)):
286  try:
287  time.sleep(2.0)
288  sys.stdout.flush()
289  except (KeyboardInterrupt, SystemExit):
290  #cleanup
291  presentBenchmarkData(queue,outfile)
292  raise
293  except:
294  #cleanup
295  presentBenchmarkData(queue,outfile)
296  raise
297  print("All job results received")
298  print("The size with the queue containing all data is: %s "%queue.qsize())
299  presentBenchmarkData(queue,outfile)
300 
301 ########################################
302 #
303 # Format of the returned data from remote host should be of the form (this could be cleaned up a little bit)
304 #
305 # list of command outputs [ dictionary of cpus { } ]
306 #
307 # For example:
308 # returned data = [ cmd_output1, cmd_output2 ... ]
309 # cmd_output1 = { cpuid1 : cpu_output1, cpuid2 : cpu_output2 ... } # cpuid is "None" if there was only one cpu used
310 # cpu_output1 = { candle1 : profset_output1, candle2 : profset_output2 ... }
311 # profset_output1 = { profset1 : profile_output1, ... }
312 # profile_output1 = { profiletype1: step_output1, ... }
313 # step_output1 = { step1: list_of_cpu_times, ... }
314 # list_of_cpu_times = [ (evt_num1, secs1), ... ]
315 
316 ###########
317 #
318 # We now massage the data
319 #
320 def presentBenchmarkData(q,outfile):
321  print("Pickling data to file %s"%outfile)
322  out = [] # match up the commands with each
323  # command that was passed in the config file
324  while not q.empty():
325  print("Queue size is still %s"%q.qsize())
326  (host, data) = q.get()
327  out.append((host,data))
328  print("Dumping at screen the output!\n%s"%out)
329  oh = open(outfile,"wb")
330  pickle.dump(out,oh)
331  oh.close()
332 
333 def _main():
334  (cmsperf_cmds, port, hosts, outfile, cmdindex) = optionparse()
335  runclient(cmsperf_cmds, hosts, port, outfile, cmdindex)
336 
337 if __name__ == "__main__":
338  _main()
def optionparse()
Option parser returns : Command set to run on each (or all) machines, port to connect to server...
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
Worker This is a subclass of thread that submits commands to the server and stores the result in a th...
def presentBenchmarkData(q, outfile)
Format of the returned data from remote host should be of the form (this could be cleaned up a little...
def __init__(self, host, port, perfcmds, queue)
def runclient(perfcmds, hosts, port, outfile, cmdindex)
def request_benchmark(perfcmds, shost, sport)
Request benchmark Connects to server and returns data returns: profiling data from server...
#define str(s)