1 from builtins
import range
2 from cStringIO
import StringIO
6 """Manager of multiple concurrent or overlapping HTTP requests. 8 This is a utility class acting as a pump of several overlapping 9 HTTP requests against any number of HTTP or HTTPS servers. It 10 uses a configurable number of simultaneous connections, ten by 11 default. The actual connection layer is handled using curl, and 12 the client classes need to aware of this to a limited degree. 14 The client supplies optional callback methods for initialising, 15 responding and handling errors on connections. At the very least 16 the request response callback should be defined. 18 This class is not designed for multi-threaded use. It employs 19 overlapping requests, but in a single thread. Only one thread 20 at a time should be calling `process()`; several threads may 21 call `.put()` provided the caller uses a mutex so that only one 22 thread calls into the method at a time.""" 24 def __init__(self, num_connections = 10, ssl_opts = None,
25 user_agent =
None, request_headers =
None,
26 request_init =
None, request_respond =
None,
27 request_error =
None, handle_init =
None):
28 """Initialise the request manager. The arguments are: 30 :arg num_connections: maximum number of simultaneous connections. 31 :arg ssl_opts: optional SSLOptions (Monitoring.Core.X509) for SSL 32 X509 parametre values, e.g. for X509 client authentication. 33 :arg user_agent: sets user agent identification string if defined. 34 :arg request_headers: if defined, specifies list of additional HTTP 35 request headers to be added to each request. 36 :arg request_init: optional callback to initialise requests; the 37 default assumes each task is a URL to access and sets the `URL` 38 property on the curl object to the task value. 39 :arg request_respond: callback for handling responses; at the very 40 minimum this should be defined as the default one does nothing. 41 :arg request_error: callback for handling connection errors; the 42 default one raises a RuntimeException. 43 :arg handle_init: callback for customising connection handles at 44 creation time; the callback will be invoked for each connection 45 object as it's created and queued to the idle connection list.""" 50 self.
handles = [Curl()
for i
in range(0, num_connections)]
57 c.setopt(TIMEOUT, 300)
58 c.setopt(CONNECTTIMEOUT, 30)
59 c.setopt(FOLLOWLOCATION, 1)
60 c.setopt(MAXREDIRS, 5)
62 c.setopt(USERAGENT, user_agent)
64 c.setopt(CAPATH, ssl_opts.ca_path)
65 c.setopt(SSLCERT, ssl_opts.cert_file)
66 c.setopt(SSLKEY, ssl_opts.key_file)
68 c.setopt(SSLKEYPASSWD, ssl_opts.key_pass)
70 c.setopt(HTTPHEADER, request_headers)
75 """Default request initialisation callback.""" 79 """Default request error callback.""" 80 raise RuntimeError((task, errmsg, errno))
83 """Default request response callback.""" 87 """Add a new task. The task object should be a tuple and is 88 passed to ``request_init`` callback passed to the constructor.""" 89 self.queue.append(task)
92 """Process pending requests until none are left. 94 This method processes all requests queued with `.put()` until they 95 have been fully processed. It calls the ``request_respond`` callback 96 for all successfully completed requests, and ``request_error`` for 99 Any new requests added by callbacks by invoking ``put()`` are also 100 processed before returning.""" 102 while self.
queue or npending:
105 c.task = self.queue.pop(0)
106 c.buffer = b = StringIO()
107 c.setopt(WRITEFUNCTION, b.write)
109 self.cm.add_handle(c)
113 ret, nhandles = self.cm.perform()
114 if ret != E_CALL_MULTI_PERFORM:
118 numq, ok, err = self.cm.info_read()
122 self.cm.remove_handle(c)
128 for c, errno, errmsg
in err:
130 self.cm.remove_handle(c)
def _request_init(self, c, url)
def _request_error(self, c, task, errmsg, errno)
def _request_respond(self, args)
def __init__(self, num_connections=10, ssl_opts=None, user_agent=None, request_headers=None, request_init=None, request_respond=None, request_error=None, handle_init=None)