CMS 3D CMS Logo

cmsPerfClient.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from builtins import range
4 import socket, xml, xmlrpclib, os, sys, threading, Queue, time, random, pickle, exceptions
5 import optparse as opt
6 from functools import reduce
7 #Documentation needs to follow... but for now just know that
8 #a template file for cmsPerfClient.py -f option is BencmarkCfg.py in Validation/Performance/python dir.
9 PROG_NAME = os.path.basename(sys.argv[0])
10 # list of valid options for the configuration file
11 validPerfSuitKeys= ["castordir", "perfsuitedir" ,"TimeSizeEvents", "TimeSizeCandles","IgProfEvents", "IgProfCandles", "CallgrindEvents", "CallgrindCandles", "MemcheckEvents","MemcheckCandles","cmsScimark", "cmsScimarkLarge",
12  "cmsdriverOptions", "stepOptions", "quicktest", "profilers", "cpus", "cores", "prevrel", "isAllCandles", "candles",
13  "bypasshlt", "runonspare", "logfile"]
14 #################
15 #
16 # Option parser
17 # returns : Command set to run on each (or all) machines, port to connect to server,
18 # List of machines to connect to, File to pickle results to,
19 # Dictionary to index which command set to use for which machine
21 
22  #########################
23  # Config file type validator
24  # Checks type of configuration options in the config file
25  #
26  def _isValidPerfCmdsDef(alist):
27  out = True
28  for item in alist:
29  isdict = isinstance(item, type({}))
30  out = out and isdict
31  if isdict:
32  for key in item:
33  out = out and key in validPerfSuitKeys
34  if key == "cpus":
35  out = out and isinstance(item[key], type("")) #has to be a string not a list!
36  elif key == "cores":
37  out = out and isinstance(item[key], type(""))
38  elif key == "castordir":
39  out = out and isinstance(item[key], type(""))
40  elif key == "perfsuitedir":
41  out = out and isinstance(item[key], type(""))
42  elif key == "TimeSizeEvents":
43  out = out and isinstance(item[key], type(123))
44  elif key == "TimeSizeCandles":
45  out = out and isinstance(item[key], type(""))
46  elif key == "CallgrindEvents":
47  out = out and isinstance(item[key], type(123))
48  elif key == "CallgrindCandles":
49  out = out and isinstance(item[key], type(""))
50  elif key == "IgProfEvents":
51  out = out and isinstance(item[key], type(123))
52  elif key == "IgProfCandles":
53  out = out and isinstance(item[key], type(""))
54  elif key == "MemcheckEvents":
55  out = out and isinstance(item[key], type(123))
56  elif key == "MemcheckCandles":
57  out = out and isinstance(item[key], type(""))
58  elif key == "cmsScimark":
59  out = out and isinstance(item[key], type(123))
60  elif key == "cmsScimarkLarge":
61  out = out and isinstance(item[key], type(123))
62  elif key == "cmsdriverOptions":
63  out = out and isinstance(item[key], type(""))
64  elif key == "stepOptions":
65  out = out and isinstance(item[key], type(""))
66  elif key == "quicktest":
67  out = out and isinstance(item[key], type(False))
68  elif key == "profilers":
69  out = out and isinstance(item[key], type(""))
70  elif key == "prevrel":
71  out = out and isinstance(item[key], type(""))
72  elif key == "isAllCandles":
73  out = out and isinstance(item[key], type(False))
74  elif key == "candles":
75  out = out and isinstance(item[key], type(""))#has to be a string not a list!
76  elif key == "bypasshlt":
77  out = out and isinstance(item[key], type(False))
78  elif key == "runonspare":
79  out = out and isinstance(item[key], type(False))
80  elif key == "logfile":
81  out = out and isinstance(item[key], type(""))
82  return out
83 
84  parser = opt.OptionParser(usage=("""%s [Options]""" % PROG_NAME))
85 
86  parser.add_option('-p',
87  '--port',
88  type="int",
89  dest='port',
90  default=-1,
91  help='Connect to server on a particular port',
92  metavar='<PORT>',
93  )
94 
95  parser.add_option('-o',
96  '--output',
97  type="string",
98  dest='outfile',
99  default="",
100  help='File to output data to',
101  metavar='<FILE>',
102  )
103 
104  parser.add_option('-m',
105  '--machines',
106  type="string",
107  action="append",
108  dest='machines',
109  default=[],
110  help='Machines to run the benchmarking on, for each machine add another one of these options',
111  metavar='<MACHINES>',
112  )
113 
114  parser.add_option('-f',
115  '--cmd-file',
116  type="string",
117  dest='cmscmdfile',
118  action="append",
119  default=[],
120  help='A files of cmsPerfSuite.py commands to execute on the machines, if more than one of these options is passed and the number of these options is the same as the number of machines, the x-th machine will use the x-th config file.',
121  metavar='<PATH>',
122  )
123 
124  (options, args) = parser.parse_args()
125 
126  ######################
127  # Check output file location
128  #
129  outfile = options.outfile
130  if not outfile == "":
131  outfile = os.path.abspath(options.outfile)
132  outdir = os.path.dirname(outfile)
133  if not os.path.isdir(outdir):
134  parser.error("ERROR: %s is not a valid directory to create %s" % (outdir,os.path.basename(outfile)))
135  sys.exit()
136  else:
137  outfile = os.path.join(os.getcwd(),"cmsmultiperfdata.pypickle")
138 
139  if os.path.exists(outfile):
140  parser.error("ERROR: outfile %s already exists" % outfile)
141  sys.exit()
142 
143 
144  ###############
145  # Check configuration files for errors
146  #
147  cmsperf_cmds = []
148  cmscmdfiles = options.cmscmdfile
149  if len(cmscmdfiles) <= 0:
150  parser.error("A valid python file defining a list of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program")
151  sys.exit()
152  else:
153  for cmscmdfile in cmscmdfiles:
154  cmdfile = os.path.abspath(cmscmdfile)
155  print(cmdfile)
156  if os.path.isfile(cmdfile):
157  try:
158  execfile(cmdfile)
159  cmsperf_cmds.append(listperfsuitekeywords)
160  except (SyntaxError) as detail:
161  parser.error("ERROR: %s must be a valid python file" % cmdfile)
162  sys.exit()
163  except (NameError) as detail:
164  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program: %s" % (cmdfile,str(detail)))
165  sys.exit()
166  except :
167  raise
168  if not isinstance(cmsperf_cmds[-1], type([])):
169  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 2" % cmdfile)
170  sys.exit()
171  if not _isValidPerfCmdsDef(cmsperf_cmds[-1]):
172  parser.error("ERROR: %s must contain a list (variable named listperfsuitekeywords) of dictionaries that represents a list of cmsPerfSuite keyword arguments must be passed to this program 3" % cmdfile)
173  sys.exit()
174 
175  else:
176  parser.error("ERROR: %s is not a file" % cmdfile)
177  sys.exit()
178 
179  ########
180  # Setup port number
181  #
182  port = 0
183  if options.port == -1:
184  port = 8000
185  else:
186  port = options.port
187 
188  machines = options.machines
189 
190  #################
191  # Check machine hostnames
192  #
193  if len(machines) <= 0:
194  parser.error("you must specify at least one machine to benchmark")
195  else:
196  machines = map(lambda x: x.strip(),machines)
197 
198  for machine in machines:
199  try:
200  output = socket.getaddrinfo(machine,port)
201  except socket.gaierror:
202  parser.error("ERROR: Can not resolve machine address %s (must be ip{4,6} or hostname)" % machine)
203  sys.exit()
204 
205  ##############
206  # Define which configuration file to use for which machine
207  # If only one configuration file is used then it used for all machines
208  cmdindex = {} # define an index that defines the commands to be run for each machine to be perfsuite'd
209  if len(cmsperf_cmds) == 1:
210  for machine in machines:
211  # each value is the index in cmsperf_cmds that the machine will run
212  # in this case all machines run the same set of commands
213  cmdindex[machine] = 0
214  else:
215  if not len(cmsperf_cmds) == len(machines):
216  parser.error("if more than one configuration file was specified you must specify a configuration file for each machine.")
217  sys.exit()
218 
219  for i in range(len(machines)):
220  # each value is the index in cmsperf_cmds that the machine will run
221  # in this case each machine runs the i-th configuration file passed as an option
222  cmdindex[machine] = i
223 
224  return (cmsperf_cmds, port, machines, outfile, cmdindex)
225 
226 #################
227 # Request benchmark
228 # Connects to server and returns data
229 # returns: profiling data from server
230 #
231 def request_benchmark(perfcmds,shost,sport):
232  try:
233  server = xmlrpclib.ServerProxy("http://%s:%s" % (shost,sport))
234  return server.request_benchmark(perfcmds)
235  except socket.error as detail:
236  print("ERROR: Could not communicate with server %s:%s:" % (shost,sport), detail)
237  except xml.parsers.expat.ExpatError as detail:
238  print("ERROR: XML-RPC could not be parsed:", detail)
239  except xmlrpclib.ProtocolError as detail:
240  print("ERROR: XML-RPC protocol error", detail, "try using -L xxx:localhost:xxx if using ssh to forward")
241  except exceptions as detail:
242  print("ERROR: There was a runtime error thrown by server %s; detail follows." % shost)
243  print(detail)
244 
245 #################
246 # Worker
247 # This is a subclass of thread that submits commands to the server and stores the result in a thread-safe queue
248 #
249 class Worker(threading.Thread):
250 
251  def __init__(self, host, port, perfcmds, queue):
252  self.__perfcmds = perfcmds
253  self.__host = host
254  self.__port = port
255  self.__queue = queue
256  threading.Thread.__init__(self)
257 
258  def run(self):
259  try:
260  data = request_benchmark(self.__perfcmds, self.__host, self.__port)
261  #Debugging
262  print("data is %s"%data)
263  print("Puttin it in the queue as (%s,%s)"%(self.__host,data))
264  self.__queue.put((self.__host, data))
265  except (exceptions.Exception, xmlrpclib.Fault) as detail:
266  print("Exception was thrown when receiving/submitting job information to host", self.__host, ". Exception information:")
267  print(detail)
268  sys.stdout.flush()
269 
270 ##########################
271 # runclient
272 # Creates a thread for each machine to profile and waits for all machines to return data (you might consider adding a timeout in the while loop)
273 # If the client is killed for some reason or there is an exception, dump the data to a file before throwing the exception
274 def runclient(perfcmds, hosts, port, outfile, cmdindex):
275  queue = Queue.Queue()
276  # start all threads
277  workers = []
278  for host in hosts:
279  print("Submitting jobs to %s..." % host)
280  w = Worker(host, port, perfcmds[cmdindex[host]], queue)
281  w.start()
282  workers.append(w)
283  print("All jobs submitted, waiting for results...")
284  sys.stdout.flush()
285  # run until all servers have returned data
286  while reduce(lambda x,y: x or y, map(lambda x: x.isAlive(),workers)):
287  try:
288  time.sleep(2.0)
289  sys.stdout.flush()
290  except (KeyboardInterrupt, SystemExit):
291  #cleanup
292  presentBenchmarkData(queue,outfile)
293  raise
294  except:
295  #cleanup
296  presentBenchmarkData(queue,outfile)
297  raise
298  print("All job results received")
299  print("The size with the queue containing all data is: %s "%queue.qsize())
300  presentBenchmarkData(queue,outfile)
301 
302 ########################################
303 #
304 # Format of the returned data from remote host should be of the form (this could be cleaned up a little bit)
305 #
306 # list of command outputs [ dictionary of cpus { } ]
307 #
308 # For example:
309 # returned data = [ cmd_output1, cmd_output2 ... ]
310 # cmd_output1 = { cpuid1 : cpu_output1, cpuid2 : cpu_output2 ... } # cpuid is "None" if there was only one cpu used
311 # cpu_output1 = { candle1 : profset_output1, candle2 : profset_output2 ... }
312 # profset_output1 = { profset1 : profile_output1, ... }
313 # profile_output1 = { profiletype1: step_output1, ... }
314 # step_output1 = { step1: list_of_cpu_times, ... }
315 # list_of_cpu_times = [ (evt_num1, secs1), ... ]
316 
317 ###########
318 #
319 # We now massage the data
320 #
321 def presentBenchmarkData(q,outfile):
322  print("Pickling data to file %s"%outfile)
323  out = [] # match up the commands with each
324  # command that was passed in the config file
325  while not q.empty():
326  print("Queue size is still %s"%q.qsize())
327  (host, data) = q.get()
328  out.append((host,data))
329  print("Dumping at screen the output!\n%s"%out)
330  oh = open(outfile,"wb")
331  pickle.dump(out,oh)
332  oh.close()
333 
334 def _main():
335  (cmsperf_cmds, port, hosts, outfile, cmdindex) = optionparse()
336  runclient(cmsperf_cmds, hosts, port, outfile, cmdindex)
337 
338 if __name__ == "__main__":
339  _main()
def optionparse()
Option parser returns : Command set to run on each (or all) machines, port to connect to server...
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
Worker This is a subclass of thread that submits commands to the server and stores the result in a th...
def presentBenchmarkData(q, outfile)
Format of the returned data from remote host should be of the form (this could be cleaned up a little...
def __init__(self, host, port, perfcmds, queue)
def runclient(perfcmds, hosts, port, outfile, cmdindex)
def request_benchmark(perfcmds, shost, sport)
Request benchmark Connects to server and returns data returns: profiling data from server...
#define str(s)