Asynchronous method call in Python?
You can use the multiprocessing module added in Python 2.6. You can use pools of processes and then get results asynchronously with:
apply_async(func[, args[, kwds[, callback]]])
E.g.:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1) # Start a worker processes.
result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
This is only one alternative. This module provides lots of facilities to achieve what you want. Also it will be really easy to make a decorator from this.
How do I call an async function in a new process using multiprocessing?
Here is how you can call an async function in a new process using multiprocessing.
In this code, each request to /webhook creates a new process, which prints apple every 5 seconds.
from __future__ import annotations
import asyncio
from multiprocessing import Process
from fastapi import FastAPI
app = FastAPI()
process_pool: list[Process] = []
async def print_message(fruit):
print(fruit)
async def loop(fruit):
while True:
await print_message(fruit)
await asyncio.sleep(5)
def run_loop(fruit):
asyncio.run(loop(fruit))
@app.get("/webhook")
async def webhook():
print("WEBHOOK RECEIVED")
fruit = "apple"
process = Process(target=run_loop, args=(fruit,))
process_pool.append(process)
process.start()
print('done')
return 'WEBHOOK RECEIVED'
@app.on_event("shutdown")
async def shutdown_event():
for process in process_pool:
process.kill()
for process in process_pool:
while process.is_alive():
continue
process.close()
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")
Related Topics
Pandas: Replace Substring in String
Understand the Find() Function in Beautiful Soup
What Is a "Good" Palette for Divergent Colors in R? (Or: Can Viridis and Magma Be Combined Together)
Benchmarks: Does Python Have a Faster Way of Walking a Network Folder
Scripting Http More Effeciently
How to Pickle a Python Function (Or Otherwise Serialize Its Code)
Python Popen Command. Wait Until the Command Is Finished
Subclassing Tuple with Multiple _Init_ Arguments
Why Is "If Not Someobj:" Better Than "If Someobj == None:" in Python
Driving Excel from Python in Windows
How to Change an Image Size in Pygame
Integration Testing for a Web App
Learning Ruby from Python; Differences and Similarities
How to Take Partial Screenshot with Selenium Webdriver in Python