![]() |
![]() |
00001 from cStringIO import StringIO 00002 from pycurl import * 00003 00004 class RequestManager: 00005 """Manager of multiple concurrent or overlapping HTTP requests. 00006 00007 This is a utility class acting as a pump of several overlapping 00008 HTTP requests against any number of HTTP or HTTPS servers. It 00009 uses a configurable number of simultaneous connections, ten by 00010 default. The actual connection layer is handled using curl, and 00011 the client classes need to aware of this to a limited degree. 00012 00013 The client supplies optional callback methods for initialising, 00014 responding and handling errors on connections. At the very least 00015 the request response callback should be defined. 00016 00017 This class is not designed for multi-threaded use. It employs 00018 overlapping requests, but in a single thread. Only one thread 00019 at a time should be calling `process()`; several threads may 00020 call `.put()` provided the caller uses a mutex so that only one 00021 thread calls into the method at a time.""" 00022 00023 def __init__(self, num_connections = 10, ssl_opts = None, 00024 user_agent = None, request_headers = None, 00025 request_init = None, request_respond = None, 00026 request_error = None, handle_init = None): 00027 """Initialise the request manager. The arguments are: 00028 00029 :arg num_connections: maximum number of simultaneous connections. 00030 :arg ssl_opts: optional SSLOptions (Monitoring.Core.X509) for SSL 00031 X509 parametre values, e.g. for X509 client authentication. 00032 :arg user_agent: sets user agent identification string if defined. 00033 :arg request_headers: if defined, specifies list of additional HTTP 00034 request headers to be added to each request. 00035 :arg request_init: optional callback to initialise requests; the 00036 default assumes each task is a URL to access and sets the `URL` 00037 property on the curl object to the task value. 00038 :arg request_respond: callback for handling responses; at the very 00039 minimum this should be defined as the default one does nothing. 00040 :arg request_error: callback for handling connection errors; the 00041 default one raises a RuntimeException. 00042 :arg handle_init: callback for customising connection handles at 00043 creation time; the callback will be invoked for each connection 00044 object as it's created and queued to the idle connection list.""" 00045 self.request_respond = request_respond or self._request_respond 00046 self.request_error = request_error or self._request_error 00047 self.request_init = request_init or self._request_init 00048 self.cm = CurlMulti() 00049 self.handles = [Curl() for i in xrange(0, num_connections)] 00050 self.free = [c for c in self.handles] 00051 self.queue = [] 00052 00053 for c in self.handles: 00054 c.buffer = None 00055 c.setopt(NOSIGNAL, 1) 00056 c.setopt(TIMEOUT, 300) 00057 c.setopt(CONNECTTIMEOUT, 30) 00058 c.setopt(FOLLOWLOCATION, 1) 00059 c.setopt(MAXREDIRS, 5) 00060 if user_agent: 00061 c.setopt(USERAGENT, user_agent) 00062 if ssl_opts: 00063 c.setopt(CAPATH, ssl_opts.ca_path) 00064 c.setopt(SSLCERT, ssl_opts.cert_file) 00065 c.setopt(SSLKEY, ssl_opts.key_file) 00066 if ssl_opts.key_pass: 00067 c.setopt(SSLKEYPASSWD, ssl_opts.key_pass) 00068 if request_headers: 00069 c.setopt(HTTPHEADER, request_headers) 00070 if handle_init: 00071 handle_init(c) 00072 00073 def _request_init(self, c, url): 00074 """Default request initialisation callback.""" 00075 c.setopt(URL, url) 00076 00077 def _request_error(self, c, task, errmsg, errno): 00078 """Default request error callback.""" 00079 raise RuntimeError((task, errmsg, errno)) 00080 00081 def _request_respond(self, *args): 00082 """Default request response callback.""" 00083 pass 00084 00085 def put(self, task): 00086 """Add a new task. The task object should be a tuple and is 00087 passed to ``request_init`` callback passed to the constructor.""" 00088 self.queue.append(task) 00089 00090 def process(self): 00091 """Process pending requests until none are left. 00092 00093 This method processes all requests queued with `.put()` until they 00094 have been fully processed. It calls the ``request_respond`` callback 00095 for all successfully completed requests, and ``request_error`` for 00096 all failed ones. 00097 00098 Any new requests added by callbacks by invoking ``put()`` are also 00099 processed before returning.""" 00100 npending = 0 00101 while self.queue or npending: 00102 while self.queue and self.free: 00103 c = self.free.pop() 00104 c.task = self.queue.pop(0) 00105 c.buffer = b = StringIO() 00106 c.setopt(WRITEFUNCTION, b.write) 00107 self.request_init(c, *c.task) 00108 self.cm.add_handle(c) 00109 npending += 1 00110 00111 while True: 00112 ret, nhandles = self.cm.perform() 00113 if ret != E_CALL_MULTI_PERFORM: 00114 break 00115 00116 while True: 00117 numq, ok, err = self.cm.info_read() 00118 00119 for c in ok: 00120 assert npending > 0 00121 self.cm.remove_handle(c) 00122 self.request_respond(c) 00123 c.buffer = None 00124 self.free.append(c) 00125 npending -= 1 00126 00127 for c, errno, errmsg in err: 00128 assert npending > 0 00129 self.cm.remove_handle(c) 00130 self.free.append(c) 00131 npending -= 1 00132 self.request_error(c, c.task, errmsg, errno) 00133 00134 if numq == 0: 00135 break 00136 00137 self.cm.select(1.) 00138