Asynchronous Requests with Python requests
Note
The below answer is not applicable to requests v0.13.0+. The asynchronous functionality was moved to grequests after this question was written. However, you could just replace requests
with grequests
below and it should work.
I've left this answer as is to reflect the original question which was about using requests < v0.13.0.
To do multiple tasks with async.map
asynchronously you have to:
- Define a function for what you want to do with each object (your task)
- Add that function as an event hook in your request
- Call
async.map
on a list of all the requests / actions
Example:
from requests import async
# If using requests > v0.13.0, use
# from grequests import async
urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]
# A simple task to do to each response object
def do_something(response):
print response.url
# A list to hold our things to do via async
async_list = []
for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})
# Add the task to our list of things to do via async
async_list.append(action_item)
# Do our list of things to do via async
async.map(async_list)
How to speed up async requests in Python
Bottleneck: number of simultaneous connections
First, the bottleneck is the total number of simultaneous connections in the TCP connector.
That default for aiohttp.TCPConnector
is limit=100
. On most systems (tested on macOS), you should be able to double that by passing a connector
with limit=200
:
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
The time taken should decrease significantly. (On macOS: q = 20_000
decreased 43% from 58 seconds to 33 seconds, and q = 10_000
decreased 42% from 31 to 18 seconds.)
The limit
you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n
to check, and ulimit -n 1024
to increase to 1024 for the current terminal session, and then change to limit=1000
. Compared to limit=100
, q = 20_000
decreased 76% to 14 seconds, and q = 10_000
decreased 71% to 9 seconds.)
Supporting 50 million requests: async generators
Next, the reason why 50 million requests appears to hang is simply because of its sheer number.
Just creating 10 million coroutines in post_tasks
takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.
We can defer the creation of coroutines using an async generator:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
We need a counterpart to asyncio.as_completed()
to handle async_gen
and concurrency
:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +
Then, we update fetch()
:
from functools import partial
CONCURRENCY = 200 # +
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +
Other limitations
With the above, the program can start processing 50 million requests but:
- it will still take 8 hours or so with
CONCURRENCY = 1000
, based on the estimate fromtqdm
. - your program may run out of memory for
responses
and crash.
For point 2, you should probably do:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
An error in the code
do_get()
should return data
:
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +
python asynchronous requests
If you want multiple concurrent requests
+ callbacks you can use module like grequests. It has nothing to do with asyncio
.
asyncio
- is all about to avoid using of callbacks (to avoid callback hell) and make writing of asynchronous code as easy as synchronous one.
If you decide to try asyncio
you should either use aiohttp
client instead of requests
(this is preferred way) or run requests
in thread pool managed by asyncio. Example of both ways can be found here.
Related Topics
Understanding the "Is" Operator
How to Define a Two-Dimensional Array
How to Do Fuzzy Match Merge With Python Pandas
How to Safely Create a Nested Directory
How to Create a New Column from the Output of Pandas Groupby().Sum()
How to Use Glob() to Find Files Recursively
Why Can't I Iterate Twice Over the Same Data
How to Find the Duplicates in a List and Create Another List With Them
How to Create a Tuple With Only One Element
Convert Hex String to Integer in Python
Get Unique Values from a List in Python
How to Identify on Which Os Python Is Running On
Is There a Portable Way to Get the Current Username in Python
How to Read Specific Lines from a File (By Line Number)