FastAPI runs api-calls in serial instead of parallel fashion
As per FastAPI's documentation:
When you declare a path operation function with normal
def
instead
ofasync def
, it is run in an external threadpool that is thenawait
ed, instead of being called directly (as it would block the
server).
also, as described here:
If you are using a third party library that communicates with
something (a database, an API, the file system, etc.) and doesn't have
support for usingawait
, (this is currently the case for most
database libraries), then declare your path operation functions as
normally, with justdef
.If your application (somehow) doesn't have to communicate with
anything else and wait for it to respond, useasync def
.If you just don't know, use normal
def
.Note: You can mix
def
andasync de
f in your path operation functions as much as you need and define each one using the best
option for you. FastAPI will do the right thing with them.Anyway, in any of the cases above, FastAPI will still work
asynchronously and be extremely fast.But by following the steps above, it will be able to do some
performance optimizations.
Thus, def
(sync) routes run in a separate thread from a threadpool, or, in other words, the server processes the requests concurrently, whereas async def
routes run on the main (single) thread, i.e., the server processes the requests sequentially - as long as there is no await
call to I/O-bound
operations inside such routes, such as waiting for data from the client to be sent through the network, contents of a file in the disk to be read, a database operation to finish, etc. - have a look here. Asynchronous code with async
and await
is many times summarised as using coroutines. Coroutines are collaborative (or cooperatively multitasked): "at any given time, a program with coroutines is running only one of its coroutines, and this running coroutine suspends its execution only when it explicitly requests to be suspended" (see here and here for more info on coroutines). However, this does not apply to CPU-bound
operations, such as the ones described here (e.g., audio or image processing, machine learning). CPU-bound
operations, even if declared in async def
functions and called using await
, will block the main thread (i.e., the event loop). This also means that a blocking operation, such as time.sleep()
, in an async def
route will block the entire server (as in your case).
Thus, if your function is not going to make any async
calls, you could declare it with def
instead, as shown below:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
Otherwise, if you are going to call async
functions that you have to await
, you should use async def
. To demonstrate this, the below uses asyncio.sleep()
function from the asyncio library. Similar example is given here and here as well.
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Both the functions above will print the expected output - as mentioned in your question - if two requests arrive at around the same time.
Hello
Hello
bye
bye
Note: When you call your endpoint for the second (third, and so on) time, please remember to do that from a tab that is isolated from the browser's main session; otherwise, the requests will be shown as coming from the same client (you could check that using print(request.client)
- the port
number would appear being the same, if both tabs were opened in the same window), and hence, the requests would be processed sequentially. You could either reload the same tab (as is running), or open a new tab in an incognito window, or use another browser/client to send the request.
Async/await and Expensive CPU-bound Operations (Long Computation Tasks)
If you are required to use async def
(as you might need to await
for coroutines inside your route), but also have some synchronous CPU-bound task that will block the event loop (essentially, the entire server) and won't let other requests to go through, for example:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
then:
Use more workers (e.g.,
uvicorn main:app --workers 4
). Note: Each worker "has its own things, variables and memory". This means thatglobal
variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".Use FastAPI's (Starlette's)
run_in_threadpool()
fromconcurrency
module (source code here and here) - as @tiangolo suggested here - which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here, "run_in_threadpool
is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports both sequence arguments and keyword arguments".from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)Alternatively, use
asyncio
'srun_in_executor
. PassingNone
as the executor argument, the default executor will be used; that isThreadPoolExecutor
:loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)or, if you would like to pass keyword arguments instead, you can use
functools.partial()
, which is specifically recommended in the documentation forrun_in_executor
:from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))ThreadPoolExecutor
will successfully prevent the event loop from being blocked, but won't give you the performance improvement you would expect from running code in parallel. It is thus preferable to run CPU-bound tasks in a separate process—usingProcessPoolExecutor
, as shown below—which, in this case, you can integrate withasyncio
, in order toawait
it to finish its work and return the result(s). As described here, on Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc. Basically, your code must be underif __name__ == '__main__':
.import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)You should also check whether you could change your endpoint's definition to just
def
. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you can instead declare the type of the endpoint's parameter asbytes
(i.e.,file: bytes = File()
) and thus, FastAPI will read the file for you and you will receive the contents asbytes
. Hence, no need to useawait file.read()
. Please note that this approach should work for small files, as the enitre file contents will be stored into memory (see the documentation onFile
Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the.read()
method of theSpooledTemporaryFile
directly (which can be accessed through the.file
attribute of theUploadFile
object), so that again you don't have toawait
the.read()
method—and as you can now declare your endpoint withdef
, each request will run in a separate thread (example is given below). For more details on how to upload aFile
, as well how Starlette/FastAPI usesSpooledTemporaryFile
behind the scenes, please have a look at this answer and this answer.@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"Have a look at the documentation here for more suggested solutions.
How to process requests from multiiple users using ML model and FastAPI?
First, you should rather not load your model every time a request arrives, but rahter have it loaded once at startup (you could use the startup event for this) and store it on the app instance—using the generic app.state
attribute (see implementation of State too)—which you can later retrieve, as described here and here. For instance:
from fastapi import Request
@app.on_event("startup")
async def startup_event():
app.state.model = torch.load(model_path)
Second, if you do not have any async
functions inside your endpoint that you have to await
, you could define your endpoint with def
instead of async def
. In this way, FastAPI will process the requests concurrently, as each request will run in a separate thread; whereas, async def
endpoints run on the main thread, i.e., the server processes the requests sequentially, as long as there is no await
call to some CPU/IO-bound (blocking) operation inside such routes. If so, the keyword await
would pass function control back to the event loop, thus allowing other tasks/requests in the event loop to run. Please have a look at the answers here and here, as well as all the references included in them, to understand the concept of async
/await
, as well as the difference between using def
and async def
. Example with def
endpoint:
@app.post('/')
def your_endpoint(request: Request):
model = request.app.state.model
# run your synchronous ask_query() function here
Alternatively, as described here, you could, preferably, run your CPU-bound task in a separate process, using ProcessPoolExecutor
, and integrate with asyncio
, in order to await
it to finish its work and return the result(s)—in this case, you would need to define your endpoint with async def
, as the await
keyword only works within an async
function. Beware that it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc.; that is, your code must be under if __name__ == '__main__'
. Example:
from fastapi import FastAPI, Request
import concurrent.futures
import asyncio
import uvicorn
class MyAIClass():
def __init__(self) -> None:
super().__init__()
def ask_query(self, model, query, topN):
# ...
ai = MyAIClass()
app = FastAPI()
@app.post('/')
async def your_endpoint(request: Request):
model = request.app.state.model
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, ai.ask_query, model, item.text, item.topN)
if __name__ == '__main__':
uvicorn.run(app)
Note that if you plan on having several workers active at the same time, each worker has its own memory—in other words, workers do not share the same memory—and hence, each worker will load their own instance of the ML model into memory (RAM). If, for instance, you are using four workers for your app, the model will result in being loaded four times into RAM. Thus, if the model, as well as other variables in your code, are consuming a large amount of memory, each process/worker will consume an equivalent amount of memory. If you would like to avoid that, you may have a look at how to share objects across multiple workers, as well as—if you are using Gunicorn as a process manager with Uvicorn workers—you can use Gunicorn's --preload
flag. As per the documentation:
Command line:
--preload
Default:
False
Load application code before the worker processes are forked.
By preloading an application you can save some RAM resources as well
as speed up server boot times. Although, if you defer application
loading to each worker process, you can reload your application code
easily by restarting workers.
Example:
gunicorn --workers 4 --preload --worker-class=uvicorn.workers.UvicornWorker app:app
Note that you cannot combine Gunicorn's --preload
with --reload
flag, as when the code is preloaded into the master process, the new worker processes—which will automatically be created, if your application code has changed—will still have the old code in memory, due to how fork()
works.
What is the proper way to make downstream Https requests inside of Uvicorn/FastAPI?
Instead of using requests
, you could use httpx
, which offers an async
API as well (httpx
is also suggested in FastAPI's documentation when performing async
tests, as well as FastAPI/Starlette recently replaced the HTTP client on TestClient
from requests
to httpx
).
The below example is based on the one given in httpx
documentation, demonstrating how to use the library for making an asynchronous HTTP(s) request, and subsequently, streaming the response back to the client. The httpx.AsyncClient()
is what you can use instead of requests.Session()
, which is useful when several requests are being made to the same host, as the underlying TCP connection will be reused, instead of recreating one for every single request—hence, resulting in a significant performance improvement. Additionally, it allows you to reuse headers
and other settings (such as proxies
and timeout
), as well as persist cookies
, across requests. You spawn a Client
and reuse it every time you need it. You can use await client.aclose()
to explicitly close the client once you are done with it (you could do that inside a shutdown
event handler, for instance). Examples and more details can also be found here.
from fastapi import FastAPI
import httpx
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
client = httpx.AsyncClient()
app = FastAPI()
@app.on_event('shutdown')
async def shutdown_event():
await client.aclose()
@app.get('/')
async def home():
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_text(), background=BackgroundTask(r.aclose))
Using the async
API of httpx
would mean that you have to define your endpoints with async def
; otherwise, you would have to use the standard synchronous API (for def
vs async def
see this answer), and as described in this github discussion:
Yes.
HTTPX
is intended to be thread-safe, and yes, a single
client-instance across all threads will do better in terms of
connection pooling, than using an instance-per-thread.
You can also control the connection pool size using the limits
keyword argument on the Client
(see Pool limit configuration). For example:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)
Related Topics
Reduce Number of Levels for Large Categorical Variables
Converting Yes and No to 0 and 1 in R
Is There an R Equivalent of the Pythonic "If _Name_ == "_Main_": Main()"
How to Add Title to Subplots in Matplotlib
Create Dynamic Urls in Flask with Url_For()
How to Import CSV Data into Django Models
Interpreting a Benchmark in C, Clojure, Python, Ruby, Scala and Others
Python VS Groovy VS Ruby? (Based on Criteria Listed in Question)
How to Integrate a Standalone Python Script into a Rails Application
Is There Something Like Bpython for Ruby
Learning Ruby from Python; Differences and Similarities
If Monkey Patching Is Permitted in Both Ruby and Python, Why Is It More Controversial in Ruby
What's the Ruby Equivalent of Python's Os.Walk
Django: Multiple Models in One Template Using Forms
Common Xlabel/Ylabel for Matplotlib Subplots
Parameterized Queries with Psycopg2/Python Db-API and Postgresql
How to Round a Floating Point Number Up to a Certain Decimal Place