Asynchronus HTTP Requests In Python Using Tornado And AsyncHTTPClient
By Brian Lee
Rationale
We often need to request several remote resources at once, without dependency on one another. The simplest solution is to request them one at a time, but that is not very scalable. We could of course involve threads and multiple processes, and Python has mechanisms for doing so, but there is another option.
Tornado
Tornado is a web framework which uses non-blocking network I/O. At MutualMind, our primary use for Tornado is our WebSocket server (sockjs-tornado). Tornado's asynchronous API uses generators to have the effect of writing asynchronous code that appears synchronous.
Take the following example of synchronous code:
from tornado.httpclient import HTTPClient client = HTTPClient() response = client.fetch("http://engineering.mutualmind.com") client.close()
Pretty simple. Imagine if that request took several seconds. Not a huge problem, but either your user is waiting or your event loop is being blocked. Since Tornado's event loop is single threaded, code that blocks will block the rest of your event loop. In fact, it doesn't even need to be an HTTP request that takes a long time. Calls to time.sleep() will block or even a for/while loop that gets out of control can damage the performance of your event loop. That could be extremely problematic for us because we have many concurrent users on a single process and if the whole event loop is blocked, then nobody is getting updates on time!
Try Again
AsyncHTTPClient to the rescue for expensive HTTP requests. Since our code is running on the event loop, we can take advantage of Python's yield syntax to pass control to Tornado.
from tornado import gen, ioloop from tornado.httpclient import AsyncHTTPClient @gen.coroutine def make_request(): client = AsyncHTTPClient() response = yield client.fetch("http://engineering.mutualmind.com") # close() is not required ioloop.IOLoop.instance().run_sync(make_request)
That looks a little more complicated. Breaking it down, we see slightly different imports, something called gen.coroutine, the yield syntax, and an ioloop instance.
gen.coroutine
tornado.gen is the aforementioned generator-based API. You might have only seen yield as a form of returning, and understood it as an interface for creating 'lazy' iterables. However, generators have the ability to receive data using the send() function. The ability to send and receive data from generators allows a user to suspend execution of a function and resume it later. In simple terms, Tornado's ioloop is just calling send() and next() millions of times to consume your code piece by piece, to create the effect of "pausing" and "resuming" the execution of each function.
IOLoop
To use gen.coroutine and the yield syntax, the event loop must be running. The run_sync function starts the loop, runs a function on the loop, and ends the loop. This is immensely powerful because it allows us to run asynchronous code in a synchronous context. In the example above, it doesn't make much sense to run a single HTTP request, but what about ten?
from tornado import gen, ioloop from tornado.httpclient import AsyncHTTPClient @gen.coroutine def make_request(): client = AsyncHTTPClient() requests = [client.fetch("http://engineering.mutualmind.com") for x in xrange(10)] response = yield requests ioloop.IOLoop.instance().run_sync(make_request)
That code will run no longer than the time the last HTTP request takes to complete. This is ideal if you have a lot of requests to make that only take a few seconds by themselves but when run serially, can take far too long. client.fetch will return a Future, which gen.coroutine will accept. In the above example we use a list of Futures which is also acceptable.
Practical Use
As noted earlier, all gen.coroutine needs to work on is a Future object, so how can we apply this to a real world problem? Currently we use pyelasticsearch for interfacing with ElasticSearch. Unfortunately, there is no offical way to run pyelasticsearch asynchronously on Tornado. Fortunately, the developers of pyelasticsearch kept their code relatively orthogonal, so we should be able to stuff some Tornado goodness in there.
Extending pyelasticsearch
Problems
pyelasticsearch.client.ElasticSearch.send_request
After looking at the source code at https://github.com/rhec/pyelasticsearch/blob/master/pyelasticsearch/client.py , this function appears to be the last direct line between your code and ElasticSearch's REST API. Since all the HTTP goes through this function, this would be a great place to return a Future
pyelasticsearch.client.ElasticSearch._raise_exception
Looks like this function has to introspect the response. This means that we can't simply return the Future created by AsyncHTTPClient.fetch, we must create our own after reading the response.
It is possible this code will need to be called from synchronous Python. We should come up with a way to integrate run_sync...
Solving send_request
Most of the code in this function should remain unchanged. For simplicity, we will leave out the automatic retries and session handling. From my experience with ElasticSearch, neither of these have been particularly useful anyway. Instead of using the lovely (albeit synchronous) python requests library for making a GET request, we will use tornado.httpclient.HTTPRequest.
from pyelasticsearch import ElasticSearch class AsyncElasticSearch(ElasticSearch): def send_request(self, method, path_components, body = '', query_params = None, encode_body = True): # NOTE: original comments ommitted # original code path = self._join_path(path_components) if query_params: path = '?'.join( [path, urlencode(dict((k, self._utf8(self._to_query(v))) for k, v in iteritems(query_params)))]) # end original code client = AsyncHTTPClient() server_url, was_dead = self.servers.get() url = "%s%s" % (server_url, path) request = HTTPRequest( url, method = method.upper(), headers = { 'Accept': 'application/json', 'Content-type': 'application/json', }, connect_timeout = self.timeout, request_timeout = self.timeout, allow_nonstandard_methods = True, # this is required to have request body to GET ) if body: request.body = request_body return client.fetch(request)
Notes
To send a body with a GET request you must have allow_nonstandard_methods = True. This parameter is not very well documented.
You might see the HTTP 599 code. This (as far I know) is a Tornado-specific code and often means your server timed out (set request_timeout higher), or you are sending a request body with an HTTP GET and allow_nonstandard_methods is not True.
Setting request_timeout and connect_timeout to the same value is probably not optimal.
What we have now would work for gen.coroutine, although you would have to JSON decode the body yourself. The next step is to decode the response and raise any exceptions.
@gen.coroutine def send_request(self, method, path_components, body = '', query_params = None, encode_body = True): ... if body: request.body = request_body response = yield client.fetch(request) prepped_response = self._decode_response(response.body) if response.code >= 400: self._raise_exception(response, prepped_response) raise gen.Return(prepped_response)
If you compare the original source, you will notice that the logging statements have been omitted for brevity. We have also decorated send_request with @gen.coroutine because the function needs to return a Future since we are no longer directly returning the Future from the AsyncHTTPClient.
Now that the first two problems have been solved, the third should be easy. All that needs to happen is an event loop (assuming it's in a synchronous context without Tornado's event loop) must be started and the coroutines must be executed. Using the second example as a guide we come up with the following:
def concurrent(self, futures): """ Runs requests in parallel on tornado's ioloop. """ @gen.coroutine def _inner(): results = yield futures raise gen.Return(results) return ioloop.IOLoop.instance().run_sync(_inner)
Example usage:
aes = AsyncElasticSearch(['localhost:9200']) aes.concurrent([ aes.get_mapping('myindex', 'mydocs'), aes.health(), aes.search({'query': {'match_all': {}}}) ])
There you have it, three requests in parallel from typical Python code (no persistent event loop required). Just don't call concurrent from your Tornado code because it will stop the current IOLoop instance! Usage inside Tornado should simply be:
aes = AsyncElasticSearch(['localhost:9200']) results = yield [ aes.get_mapping('myindex', 'mydocs'), aes.health(), aes.search({'query': {'match_all': {}}}) ]






















