How Could I Use Requests in Asyncio

How could I use requests in asyncio?

To use requests (or any other blocking libraries) with asyncio, you can use BaseEventLoop.run_in_executor to run a function in another thread and yield from it to get the result. For example:

import asyncio
import requests

@asyncio.coroutine
def main():
loop = asyncio.get_event_loop()
future1 = loop.run_in_executor(None, requests.get, 'http://www.google.com')
future2 = loop.run_in_executor(None, requests.get, 'http://www.google.co.uk')
response1 = yield from future1
response2 = yield from future2
print(response1.text)
print(response2.text)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

This will get both responses in parallel.

With python 3.5 you can use the new await/async syntax:

import asyncio
import requests

async def main():
loop = asyncio.get_event_loop()
future1 = loop.run_in_executor(None, requests.get, 'http://www.google.com')
future2 = loop.run_in_executor(None, requests.get, 'http://www.google.co.uk')
response1 = await future1
response2 = await future2
print(response1.text)
print(response2.text)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

See PEP0492 for more.

How to run requests.get asynchronously in Python 3 using asyncio?

except:

It'll catch also service exceptions line KeyboardInterrupt or StopIteration. Never do such thing. Instead write:

except Exception:

How to run requests.get asynchronously in Python 3 using asyncio?

requests.get is blocking by nature.

You should either find async alternative for requests like aiohttp module:

async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()

or run requests.get in separate thread and await this thread asynchronicity using loop.run_in_executor:

executor = ThreadPoolExecutor(2)

async def get(url):
response = await loop.run_in_executor(executor, requests.get, url)
return response.text

Asynchronous Requests with Python requests

Note

The below answer is not applicable to requests v0.13.0+. The asynchronous functionality was moved to grequests after this question was written. However, you could just replace requests with grequests below and it should work.

I've left this answer as is to reflect the original question which was about using requests < v0.13.0.


To do multiple tasks with async.map asynchronously you have to:

  1. Define a function for what you want to do with each object (your task)
  2. Add that function as an event hook in your request
  3. Call async.map on a list of all the requests / actions

Example:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})

# Add the task to our list of things to do via async
async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

How to speed up async requests in Python

Bottleneck: number of simultaneous connections

First, the bottleneck is the total number of simultaneous connections in the TCP connector.

That default for aiohttp.TCPConnector is limit=100. On most systems (tested on macOS), you should be able to double that by passing a connector with limit=200:

# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:

The time taken should decrease significantly. (On macOS: q = 20_000 decreased 43% from 58 seconds to 33 seconds, and q = 10_000 decreased 42% from 31 to 18 seconds.)

The limit you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n to check, and ulimit -n 1024 to increase to 1024 for the current terminal session, and then change to limit=1000. Compared to limit=100, q = 20_000 decreased 76% to 14 seconds, and q = 10_000 decreased 71% to 9 seconds.)

Supporting 50 million requests: async generators

Next, the reason why 50 million requests appears to hang is simply because of its sheer number.

Just creating 10 million coroutines in post_tasks takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.

We can defer the creation of coroutines using an async generator:

async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)

We need a counterpart to asyncio.as_completed() to handle async_gen and concurrency:

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +

def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +

async def _wait_for_one():
f = await done.get()
return f.result()

async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)

# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +

Then, we update fetch():

from functools import partial

CONCURRENCY = 200 # +

n = 0
q = 50_000_000

async def fetch():
# example
url = "https://httpbin.org/anything/log?id="

async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +

# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +

Other limitations

With the above, the program can start processing 50 million requests but:

  1. it will still take 8 hours or so with CONCURRENCY = 1000, based on the estimate from tqdm.
  2. your program may run out of memory for responses and crash.

For point 2, you should probably do:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f

# Do something with response, such as writing to a local file
# ...


An error in the code

do_get() should return data:

async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}

async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +

Multiple async requests simultaneously

aiohttp with Native Coroutines (async/await)

Here is a typical pattern that accomplishes what you're trying to do. (Python 3.7+.)

One major change is that you will need to move from requests, which is built for synchronous IO, to a package such as aiohttp that is built specifically to work with async/await (native coroutines):

import asyncio
import aiohttp # pip install aiohttp aiodns

async def get(
session: aiohttp.ClientSession,
color: str,
**kwargs
) -> dict:
url = f"https://api.com/{color}/"
print(f"Requesting {url}")
resp = await session.request('GET', url=url, **kwargs)
# Note that this may raise an exception for non-2xx responses
# You can either handle that here, or pass the exception through
data = await resp.json()
print(f"Received data for {url}")
return data

async def main(colors, **kwargs):
# Asynchronous context manager. Prefer this rather
# than using a different session for each GET request
async with aiohttp.ClientSession() as session:
tasks = []
for c in colors:
tasks.append(get(session=session, color=c, **kwargs))
# asyncio.gather() will wait on the entire task set to be
# completed. If you want to process results greedily as they come in,
# loop over asyncio.as_completed()
htmls = await asyncio.gather(*tasks, return_exceptions=True)
return htmls

if __name__ == '__main__':
colors = ['red', 'blue', 'green'] # ...
# Either take colors from stdin or make some default here
asyncio.run(main(colors)) # Python 3.7+

There are two distinct elements to this, one being the asynchronous aspect of the coroutines and one being the concurrency introduced on top of that when you specify a container of tasks (futures):

  • You create one coroutine get that uses await with two awaitables: the first being .request and the second being .json. This is the async aspect. The purpose of awaiting these IO-bound responses is to tell the event loop that other get() calls can take turns running through that same routine.
  • The concurrent aspect is encapsulated in await asyncio.gather(*tasks). This maps the awaitable get() call to each of your colors. The result is an aggregate list of returned values. Note that this wrapper will wait until all of your responses come in and call .json(). If, alternatively, you want to process them greedily as they are ready, you can loop over asyncio.as_completed: each Future object returned represents the earliest result from the set of the remaining awaitables.

Lastly, take note that asyncio.run() is a high-level "porcelain" function introduced in Python 3.7. In earlier versions, you can mimic it (roughly) like:

# The "full" versions makes a new event loop and calls
# loop.shutdown_asyncgens(), see link above
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(colors))
finally:
loop.close()

Limiting Requests

There are a number of ways to limit the rate of concurrency. For instance, see asyncio.semaphore in async-await function or large numbers of tasks with limited concurrency.

How to create Python3 Asynchronous Web Requests with asyncio & aiohttp?

Couple of important things:

  • Python interpreter GIL, runs on a single-thread; so technically you arent really running things in parallel
  • But the catch is, most I/O operations 'hog' resources while your CPU during these periods is still idle. Thats where libraries like asyncio comes to your rescue.
  • They try to ensure minimal CPU-idle-time, by running other tasks in your queue while major I/O operations are awaiting their results

In your case, update_posts() doesnt really seem like an async method in an ideal sense; because this method is technically only used to figure out which posts are to be downloaded and written

And since we are already discussing about download and writing, you can notice that you can actually make them run as independent tasks so ensure minimal downtime.

Here is how I might approach this:

import asyncio
from asyncio import Queue
import aiohttp
import os

async def generate_download_post_tasks(path, queue: Queue):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
queue.put_nowait((id, path))

async def download_post_tasks(download_queue: Queue, write_queue: Queue):
async with aiohttp.ClientSession() as session:
while True:
download_request_id, path = await download_queue.get()
async with session.get(f"{apiurl}item/{download_request_id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {download_request_id}")
if json["type"] == "story":
write_queue.put_nowait((download_request_id, content, path))

async def write_post_tasks(write_queue: Queue):
while True:
post_id, post_content, path = await write_queue.get()
print(f"Begin writing to {post_id}.json")
with open(os.path.join(path, f"{post_id}.json"), "w") as file:
file.write(post_content)
print(f"Done writing to {post_id}.json")

async def async_main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)

tasks = set()
download_queue = Queue()
write_queue = Queue()
tasks.add(asyncio.create_task(generate_download_post_tasks(path=path, queue=download_queue)))
tasks.add(asyncio.create_task(download_post_tasks(download_queue=download_queue, write_queue=write_queue)))
tasks.add(asyncio.create_task(write_post_tasks(write_queue=write_queue)))

wait_time = 100

try:
await asyncio.wait_for(asyncio.gather(*tasks), wait_time)
except:
# Catch errors
print("End!!")


if __name__ == '__main__':
asyncio.run(async_main())

asyncio wait - process results as they come

done and pending are sets of asyncio.Task objects. If you want to get the result of the task or its state you must get the values of the sets and call the method you need, check the (docs). Specifically you can get the result invoking the result method.

async def main(tasks):
async with ClientSession() as session:
pending = [asyncio.create_task(fetch_content_1(session, url)) for url in tasks]
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
res = done.pop().result()
# do some stuff with the result

Check the documentation to see the possible exceptions of call the result method and related methods. A exception may occur if the task had an internal error or the result is not ready (in this case shouldn't happen).



Related Topics



Leave a reply



Submit