Asynchronous Requests With Python Requests

Asynchronous Requests with Python requests

Note

The below answer is not applicable to requests v0.13.0+. The asynchronous functionality was moved to grequests after this question was written. However, you could just replace requests with grequests below and it should work.

I've left this answer as is to reflect the original question which was about using requests < v0.13.0.


To do multiple tasks with async.map asynchronously you have to:

  1. Define a function for what you want to do with each object (your task)
  2. Add that function as an event hook in your request
  3. Call async.map on a list of all the requests / actions

Example:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})

# Add the task to our list of things to do via async
async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

How to speed up async requests in Python

Bottleneck: number of simultaneous connections

First, the bottleneck is the total number of simultaneous connections in the TCP connector.

That default for aiohttp.TCPConnector is limit=100. On most systems (tested on macOS), you should be able to double that by passing a connector with limit=200:

# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:

The time taken should decrease significantly. (On macOS: q = 20_000 decreased 43% from 58 seconds to 33 seconds, and q = 10_000 decreased 42% from 31 to 18 seconds.)

The limit you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n to check, and ulimit -n 1024 to increase to 1024 for the current terminal session, and then change to limit=1000. Compared to limit=100, q = 20_000 decreased 76% to 14 seconds, and q = 10_000 decreased 71% to 9 seconds.)

Supporting 50 million requests: async generators

Next, the reason why 50 million requests appears to hang is simply because of its sheer number.

Just creating 10 million coroutines in post_tasks takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.

We can defer the creation of coroutines using an async generator:

async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)

We need a counterpart to asyncio.as_completed() to handle async_gen and concurrency:

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +

def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +

async def _wait_for_one():
f = await done.get()
return f.result()

async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)

# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +

Then, we update fetch():

from functools import partial

CONCURRENCY = 200 # +

n = 0
q = 50_000_000

async def fetch():
# example
url = "https://httpbin.org/anything/log?id="

async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +

# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +

Other limitations

With the above, the program can start processing 50 million requests but:

  1. it will still take 8 hours or so with CONCURRENCY = 1000, based on the estimate from tqdm.
  2. your program may run out of memory for responses and crash.

For point 2, you should probably do:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f

# Do something with response, such as writing to a local file
# ...


An error in the code

do_get() should return data:

async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}

async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +

python asynchronous requests

If you want multiple concurrent requests + callbacks you can use module like grequests. It has nothing to do with asyncio.

asyncio - is all about to avoid using of callbacks (to avoid callback hell) and make writing of asynchronous code as easy as synchronous one.

If you decide to try asyncio you should either use aiohttp client instead of requests (this is preferred way) or run requests in thread pool managed by asyncio. Example of both ways can be found here.



Related Topics



Leave a reply



Submit