CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/DQMServices/Components/python/HTTP.py

Go to the documentation of this file.
00001 from cStringIO import StringIO
00002 from pycurl import *
00003 
00004 class RequestManager:
00005   """Manager of multiple concurrent or overlapping HTTP requests.
00006 
00007 This is a utility class acting as a pump of several overlapping
00008 HTTP requests against any number of HTTP or HTTPS servers. It
00009 uses a configurable number of simultaneous connections, ten by
00010 default. The actual connection layer is handled using curl, and
00011 the client classes need to aware of this to a limited degree.
00012 
00013 The client supplies optional callback methods for initialising,
00014 responding and handling errors on connections. At the very least
00015 the request response callback should be defined.
00016 
00017 This class is not designed for multi-threaded use. It employs
00018 overlapping requests, but in a single thread. Only one thread
00019 at a time should be calling `process()`; several threads may
00020 call `.put()` provided the caller uses a mutex so that only one
00021 thread calls into the method at a time."""
00022 
00023   def __init__(self, num_connections = 10, ssl_opts = None,
00024                user_agent = None, request_headers = None,
00025                request_init = None, request_respond = None,
00026                request_error = None, handle_init = None):
00027     """Initialise the request manager. The arguments are:
00028 
00029 :arg num_connections: maximum number of simultaneous connections.
00030 :arg ssl_opts: optional SSLOptions (Monitoring.Core.X509) for SSL
00031 X509 parametre values, e.g. for X509 client authentication.
00032 :arg user_agent: sets user agent identification string if defined.
00033 :arg request_headers: if defined, specifies list of additional HTTP
00034 request headers to be added to each request.
00035 :arg request_init: optional callback to initialise requests; the
00036 default assumes each task is a URL to access and sets the `URL`
00037 property on the curl object to the task value.
00038 :arg request_respond: callback for handling responses; at the very
00039 minimum this should be defined as the default one does nothing.
00040 :arg request_error: callback for handling connection errors; the
00041 default one raises a RuntimeException.
00042 :arg handle_init: callback for customising connection handles at
00043 creation time; the callback will be invoked for each connection
00044 object as it's created and queued to the idle connection list."""
00045     self.request_respond = request_respond or self._request_respond
00046     self.request_error = request_error or self._request_error
00047     self.request_init = request_init or self._request_init
00048     self.cm = CurlMulti()
00049     self.handles = [Curl() for i in xrange(0, num_connections)]
00050     self.free = [c for c in self.handles]
00051     self.queue = []
00052 
00053     for c in self.handles:
00054       c.buffer = None
00055       c.setopt(NOSIGNAL, 1)
00056       c.setopt(TIMEOUT, 300)
00057       c.setopt(CONNECTTIMEOUT, 30)
00058       c.setopt(FOLLOWLOCATION, 1)
00059       c.setopt(MAXREDIRS, 5)
00060       if user_agent:
00061         c.setopt(USERAGENT, user_agent)
00062       if ssl_opts:
00063         c.setopt(CAPATH, ssl_opts.ca_path)
00064         c.setopt(SSLCERT, ssl_opts.cert_file)
00065         c.setopt(SSLKEY, ssl_opts.key_file)
00066         if ssl_opts.key_pass:
00067           c.setopt(SSLKEYPASSWD, ssl_opts.key_pass)
00068       if request_headers:
00069         c.setopt(HTTPHEADER, request_headers)
00070       if handle_init:
00071         handle_init(c)
00072 
00073   def _request_init(self, c, url):
00074     """Default request initialisation callback."""
00075     c.setopt(URL, url)
00076 
00077   def _request_error(self, c, task, errmsg, errno):
00078     """Default request error callback."""
00079     raise RuntimeError((task, errmsg, errno))
00080 
00081   def _request_respond(self, *args):
00082     """Default request response callback."""
00083     pass
00084 
00085   def put(self, task):
00086     """Add a new task. The task object should be a tuple and is
00087 passed to ``request_init`` callback passed to the constructor."""
00088     self.queue.append(task)
00089 
00090   def process(self):
00091     """Process pending requests until none are left.
00092 
00093 This method processes all requests queued with `.put()` until they
00094 have been fully processed. It calls the ``request_respond`` callback
00095 for all successfully completed requests, and ``request_error`` for
00096 all failed ones.
00097 
00098 Any new requests added by callbacks by invoking ``put()`` are also
00099 processed before returning."""
00100     npending = 0
00101     while self.queue or npending:
00102       while self.queue and self.free:
00103         c = self.free.pop()
00104         c.task = self.queue.pop(0)
00105         c.buffer = b = StringIO()
00106         c.setopt(WRITEFUNCTION, b.write)
00107         self.request_init(c, *c.task)
00108         self.cm.add_handle(c)
00109         npending += 1
00110 
00111       while True:
00112         ret, nhandles = self.cm.perform()
00113         if ret != E_CALL_MULTI_PERFORM:
00114           break
00115 
00116       while True:
00117         numq, ok, err = self.cm.info_read()
00118 
00119         for c in ok:
00120           assert npending > 0
00121           self.cm.remove_handle(c)
00122           self.request_respond(c)
00123           c.buffer = None
00124           self.free.append(c)
00125           npending -= 1
00126 
00127         for c, errno, errmsg in err:
00128           assert npending > 0
00129           self.cm.remove_handle(c)
00130           self.free.append(c)
00131           npending -= 1
00132           self.request_error(c, c.task, errmsg, errno)
00133 
00134         if numq == 0:
00135           break
00136 
00137       self.cm.select(1.)
00138