Fastapi Runs API-Calls in Serial Instead of Parallel Fashion

FastAPI runs api-calls in serial instead of parallel fashion

As per FastAPI's documentation:

When you declare a path operation function with normal def instead
of async def, it is run in an external threadpool that is then
awaited
, instead of being called directly (as it would block the
server).

also, as described here:

If you are using a third party library that communicates with
something (a database, an API, the file system, etc.) and doesn't have
support for using await, (this is currently the case for most
database libraries), then declare your path operation functions as
normally, with just def.

If your application (somehow) doesn't have to communicate with
anything else and wait for it to respond, use async def.

If you just don't know, use normal def.

Note: You can mix def and async def in your path operation functions as much as you need and define each one using the best
option for you. FastAPI will do the right thing with them.

Anyway, in any of the cases above, FastAPI will still work
asynchronously
and be extremely fast.

But by following the steps above, it will be able to do some
performance optimizations.

Thus, def (sync) routes run in a separate thread from a threadpool, or, in other words, the server processes the requests concurrently, whereas async def routes run on the main (single) thread, i.e., the server processes the requests sequentially - as long as there is no await call to I/O-bound operations inside such routes, such as waiting for data from the client to be sent through the network, contents of a file in the disk to be read, a database operation to finish, etc. - have a look here. Asynchronous code with async and await is many times summarised as using coroutines. Coroutines are collaborative (or cooperatively multitasked): "at any given time, a program with coroutines is running only one of its coroutines, and this running coroutine suspends its execution only when it explicitly requests to be suspended" (see here and here for more info on coroutines). However, this does not apply to CPU-bound operations, such as the ones described here (e.g., audio or image processing, machine learning). CPU-bound operations, even if declared in async def functions and called using await, will block the main thread (i.e., the event loop). This also means that a blocking operation, such as time.sleep(), in an async def route will block the entire server (as in your case).

Thus, if your function is not going to make any async calls, you could declare it with def instead, as shown below:

@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"

Otherwise, if you are going to call async functions that you have to await, you should use async def. To demonstrate this, the below uses asyncio.sleep() function from the asyncio library. Similar example is given here and here as well.

import asyncio

@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"

Both the functions above will print the expected output - as mentioned in your question - if two requests arrive at around the same time.

Hello
Hello
bye
bye

Note: When you call your endpoint for the second (third, and so on) time, please remember to do that from a tab that is isolated from the browser's main session; otherwise, the requests will be shown as coming from the same client (you could check that using print(request.client) - the port number would appear being the same, if both tabs were opened in the same window), and hence, the requests would be processed sequentially. You could either reload the same tab (as is running), or open a new tab in an incognito window, or use another browser/client to send the request.

Async/await and Expensive CPU-bound Operations (Long Computation Tasks)

If you are required to use async def (as you might need to await for coroutines inside your route), but also have some synchronous CPU-bound task that will block the event loop (essentially, the entire server) and won't let other requests to go through, for example:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"

then:

  1. Use more workers (e.g., uvicorn main:app --workers 4). Note: Each worker "has its own things, variables and memory". This means that global variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".

  2. Use FastAPI's (Starlette's) run_in_threadpool() from concurrency module (source code here and here) - as @tiangolo suggested here - which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here, "run_in_threadpool is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports both sequence arguments and keyword arguments".

    from fastapi.concurrency import run_in_threadpool

    res = await run_in_threadpool(cpu_bound_task, contents)
  3. Alternatively, use asyncio's run_in_executor. Passing None as the executor argument, the default executor will be used; that is ThreadPoolExecutor:

    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, cpu_bound_task, contents)

    or, if you would like to pass keyword arguments instead, you can use functools.partial(), which is specifically recommended in the documentation for run_in_executor:

    from functools import partial

    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
  4. ThreadPoolExecutor will successfully prevent the event loop from being blocked, but won't give you the performance improvement you would expect from running code in parallel. It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor, as shown below—which, in this case, you can integrate with asyncio, in order to await it to finish its work and return the result(s). As described here, on Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__':.

    import concurrent.futures

    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
    res = await loop.run_in_executor(pool, cpu_bound_task, contents)
  5. You should also check whether you could change your endpoint's definition to just def. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you can instead declare the type of the endpoint's parameter as bytes (i.e., file: bytes = File()) and thus, FastAPI will read the file for you and you will receive the contents as bytes. Hence, no need to use await file.read(). Please note that this approach should work for small files, as the enitre file contents will be stored into memory (see the documentation on File Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the .read() method of the SpooledTemporaryFile directly (which can be accessed through the .file attribute of the UploadFile object), so that again you don't have to await the .read() method—and as you can now declare your endpoint with def, each request will run in a separate thread (example is given below). For more details on how to upload a File, as well how Starlette/FastAPI uses SpooledTemporaryFile behind the scenes, please have a look at this answer and this answer.

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
    contents = file.file.read()
    res = cpu_bound_task(contents)
    finally:
    file.file.close()
    print("bye")
    return "pong"
  6. Have a look at the documentation here for more suggested solutions.

How to process requests from multiiple users using ML model and FastAPI?

First, you should rather not load your model every time a request arrives, but rahter have it loaded once at startup (you could use the startup event for this) and store it on the app instance—using the generic app.state attribute (see implementation of State too)—which you can later retrieve, as described here and here. For instance:

from fastapi import Request

@app.on_event("startup")
async def startup_event():
app.state.model = torch.load(model_path)

Second, if you do not have any async functions inside your endpoint that you have to await, you could define your endpoint with def instead of async def. In this way, FastAPI will process the requests concurrently, as each request will run in a separate thread; whereas, async def endpoints run on the main thread, i.e., the server processes the requests sequentially, as long as there is no await call to some CPU/IO-bound (blocking) operation inside such routes. If so, the keyword await would pass function control back to the event loop, thus allowing other tasks/requests in the event loop to run. Please have a look at the answers here and here, as well as all the references included in them, to understand the concept of async/await, as well as the difference between using def and async def. Example with def endpoint:

@app.post('/')
def your_endpoint(request: Request):
model = request.app.state.model
# run your synchronous ask_query() function here

Alternatively, as described here, you could, preferably, run your CPU-bound task in a separate process, using ProcessPoolExecutor, and integrate with asyncio, in order to await it to finish its work and return the result(s)—in this case, you would need to define your endpoint with async def, as the await keyword only works within an async function. Beware that it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc.; that is, your code must be under if __name__ == '__main__'. Example:

from fastapi import FastAPI, Request
import concurrent.futures
import asyncio
import uvicorn

class MyAIClass():
def __init__(self) -> None:
super().__init__()

def ask_query(self, model, query, topN):
# ...

ai = MyAIClass()
app = FastAPI()

@app.post('/')
async def your_endpoint(request: Request):
model = request.app.state.model

loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, ai.ask_query, model, item.text, item.topN)

if __name__ == '__main__':
uvicorn.run(app)

Note that if you plan on having several workers active at the same time, each worker has its own memory—in other words, workers do not share the same memory—and hence, each worker will load their own instance of the ML model into memory (RAM). If, for instance, you are using four workers for your app, the model will result in being loaded four times into RAM. Thus, if the model, as well as other variables in your code, are consuming a large amount of memory, each process/worker will consume an equivalent amount of memory. If you would like to avoid that, you may have a look at how to share objects across multiple workers, as well as—if you are using Gunicorn as a process manager with Uvicorn workers—you can use Gunicorn's --preload flag. As per the documentation:

Command line: --preload

Default: False

Load application code before the worker processes are forked.

By preloading an application you can save some RAM resources as well
as speed up server boot times. Although, if you defer application
loading to each worker process, you can reload your application code
easily by restarting workers.

Example:

gunicorn --workers 4 --preload --worker-class=uvicorn.workers.UvicornWorker app:app

Note that you cannot combine Gunicorn's --preload with --reload flag, as when the code is preloaded into the master process, the new worker processes—which will automatically be created, if your application code has changed—will still have the old code in memory, due to how fork() works.

What is the proper way to make downstream Https requests inside of Uvicorn/FastAPI?

Instead of using requests, you could use httpx, which offers an async API as well (httpx is also suggested in FastAPI's documentation when performing async tests, as well as FastAPI/Starlette recently replaced the HTTP client on TestClient from requests to httpx).

The below example is based on the one given in httpx documentation, demonstrating how to use the library for making an asynchronous HTTP(s) request, and subsequently, streaming the response back to the client. The httpx.AsyncClient() is what you can use instead of requests.Session(), which is useful when several requests are being made to the same host, as the underlying TCP connection will be reused, instead of recreating one for every single request—hence, resulting in a significant performance improvement. Additionally, it allows you to reuse headers and other settings (such as proxies and timeout), as well as persist cookies, across requests. You spawn a Client and reuse it every time you need it. You can use await client.aclose() to explicitly close the client once you are done with it (you could do that inside a shutdown event handler, for instance). Examples and more details can also be found here.

from fastapi import FastAPI
import httpx
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse

client = httpx.AsyncClient()
app = FastAPI()

@app.on_event('shutdown')
async def shutdown_event():
await client.aclose()

@app.get('/')
async def home():
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_text(), background=BackgroundTask(r.aclose))

Using the async API of httpx would mean that you have to define your endpoints with async def; otherwise, you would have to use the standard synchronous API (for def vs async def see this answer), and as described in this github discussion:

Yes. HTTPX is intended to be thread-safe, and yes, a single
client-instance across all threads will do better in terms of
connection pooling, than using an instance-per-thread.

You can also control the connection pool size using the limits keyword argument on the Client (see Pool limit configuration). For example:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)


Related Topics



Leave a reply



Submit