CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ws_sso_content_reader.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 ###Description: The tool reads cern web services behind SSO using user certificates
3 import os, urllib, urllib2, httplib, cookielib, sys, HTMLParser, re
4 from optparse import OptionParser
5 
6 def getFile(path):
7  npath = os.path.expanduser(path)
8  while os.path.islink(npath):
9  path = os.readlink(npath)
10  if path[0] != "/": path = os.path.join(os.path.dirname(npath),path)
11  npath = path
12  return npath
13 
14 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
15  def __init__(self, key, cert):
16  urllib2.HTTPSHandler.__init__(self)
17  self.key = getFile(key)
18  self.cert = getFile(cert)
19 
20  def https_open(self, req):
21  return self.do_open(self.getConnection, req)
22 
23  def getConnection(self, host, timeout=300):
24  return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)
25 
26 def _getResponse(opener, url, post_data=None, debug=False):
27  response = opener.open(url, post_data)
28  if debug:
29  sys.stderr.write("Code: %s\n" % response.code)
30  sys.stderr.write("Headers: %s\n" % response.headers)
31  sys.stderr.write("Msg: %s\n" % response.msg)
32  sys.stderr.write("Url: %s\n" % response.url)
33  return response
34 
35 def getResponseContent(opener, url, post_data=None, debug=False):
36  return _getResponse(opener, url, post_data, debug).read()
37 
38 def getResponseURL(opener, url, post_data=None, debug=False):
39  return urllib2.unquote(_getResponse(opener, url, post_data, debug).url)
40 
41 def getParentURL(url):
42  items = url.split("/")
43  return '%s//%s/%s/' % (items[0],items[2],items[3])
44 
45 def getSSOCookie(opener, target_url, cookie, debug=False):
46  opener.addheaders = [('User-agent', 'curl-sso-certificate/0.0.2')] #in sync with cern-get-sso-cookie tool
47  url = getResponseURL(opener, getParentURL(target_url), debug=debug)
48  content = getResponseContent(opener, url, debug=debug)
49  ret = re.search('<form .+? action="(.+?)">', content)
50  if ret == None:
51  raise Exception("error: The page doesn't have the form with adfs url, check 'User-agent' header")
52  url = urllib2.unquote(ret.group(1))
53  h = HTMLParser.HTMLParser()
54  post_data_local = ''
55  for match in re.finditer('input type="hidden" name="([^"]*)" value="([^"]*)"', content):
56  post_data_local += "&%s=%s" % (match.group(1), urllib.quote(h.unescape(match.group(2))))
57  is_link_found = True
58 
59  if not is_link_found:
60  raise Exception("error: The page doesn't have the form with security attributes, check 'User-agent' header")
61  post_data_local = post_data_local[1:] #remove first &
62  getResponseContent(opener, url, post_data_local, debug)
63 
64 def getContent(target_url, cert_path, key_path, post_data=None, debug=False, adfslogin=None):
65  opener = urllib2.build_opener(urllib2.HTTPSHandler())
66  if adfslogin:
67  opener.addheaders = [('Adfs-Login', adfslogin)] #local version of tc test
68 
69  #try to access the url first
70  try:
71  content = getResponseContent(opener, target_url, post_data, debug)
72  if not 'Sign in with your CERN account' in content:
73  return content
74  except Exception:
75  if debug:
76  sys.stderr.write("The request has an error, will try to create a new cookie\n")
77 
78  cookie = cookielib.CookieJar()
79  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie), HTTPSClientAuthHandler(key_path, cert_path)) #will use private key and ceritifcate
80  if debug:
81  sys.stderr.write("The return page is sso login page, will request cookie.")
82  hasCookie = False
83  # if the access gave an exception, try to get a cookie
84  try:
85  getSSOCookie(opener, target_url, cookie, debug)
86  hasCookie = True
87  result = getResponseContent(opener, target_url, post_data, debug)
88  except Exception, e:
89  result = ""
90  print sys.stderr.write("ERROR:"+str(e))
91  if hasCookie:
92  burl = getParentURL(target_url)
93  try:
94  _getResponse(opener, burl+"signOut").read()
95  _getResponse(opener, "https://login.cern.ch/adfs/ls/?wa=wsignout1.0").read()
96  except:
97  sys.stderr.write("Error, could not logout correctly from server")
98  return result
99 
100 def checkRequiredArguments(opts, parser):
101  missing_options = []
102  for option in parser.option_list:
103  if re.match(r'^\[REQUIRED\]', option.help) and eval('opts. %s' % option.dest) == None:
104  missing_options.extend(option._long_opts)
105  if len(missing_options) > 0:
106  parser.error('Missing REQUIRED parameters: %s' % str(missing_options))
107 
108 if __name__ == "__main__":
109  parser = OptionParser(usage="%prog [-d(ebug)] -o(ut) COOKIE_FILENAME -c(cert) CERN-PEM -k(ey) CERT-KEY -u(rl) URL")
110  parser.add_option("-d", "--debug", dest="debug", help="Enable pycurl debugging. Prints to data and headers to stderr.", action="store_true", default=False)
111  parser.add_option("-p", "--postdata", dest="postdata", help="Data to be sent as post request", action="store", default=None)
112  parser.add_option("-c", "--cert", dest="cert_path", help="[REQUIRED] Absolute path to cert file.", action="store")
113  parser.add_option("-k", "--key", dest="key_path", help="[REQUIRED] Absolute path to key file.", action="store")
114  parser.add_option("-u", "--url", dest="url", help="[REQUIRED] Url to a service behind the SSO", action="store")
115  (opts, args) = parser.parse_args()
116  checkRequiredArguments(opts, parser)
117  content = getContent(opts.url, opts.cert_path, opts.key_path, opts.postdata, opts.debug)
118  print content