How to Add a Background Thread to Flask

How can I add a background thread to flask?

Your additional threads must be initiated from the same app that is called by the WSGI server.

The example below creates a background thread that executes every 5 seconds and manipulates data structures that are also available to Flask routed functions.

import threading
import atexit
from flask import Flask

POOL_TIME = 5 #Seconds

# variables that are accessible from anywhere
commonDataStruct = {}
# lock to control access to variable
dataLock = threading.Lock()
# thread handler
yourThread = threading.Thread()

def create_app():
app = Flask(__name__)

def interrupt():
global yourThread
yourThread.cancel()

def doStuff():
global commonDataStruct
global yourThread
with dataLock:
pass
# Do your stuff with commonDataStruct Here

# Set the next thread to happen
yourThread = threading.Timer(POOL_TIME, doStuff, ())
yourThread.start()

def doStuffStart():
# Do initialisation stuff here
global yourThread
# Create your thread
yourThread = threading.Timer(POOL_TIME, doStuff, ())
yourThread.start()

# Initiate
doStuffStart()
# When you kill Flask (SIGTERM), clear the trigger for the next thread
atexit.register(interrupt)
return app

app = create_app()

Call it from Gunicorn with something like this:

gunicorn -b 0.0.0.0:5000 --log-config log.conf --pid=app.pid myfile:app

flask application with background threads

The best thing to do for stuff like this is use a message broker. There is some excellent software in the python world meant for doing just this:

  • Celery (http://www.celeryproject.org/), and
  • RQ (http://python-rq.org/).

Both are excellent choices.

It's almost never a good idea to spawn a thread the way you're doing it, as this can cause issues processing incoming requests, among other things.

If you take a look at the celery or RQ getting started guides, they'll walk you through doing this the proper way!

Is there a way to run a function in the background of a python flask app?

Thank you for your suggestions, as they definitely helped me research the topic, but I finally figured out how to get threading working.

Turns out my issue was returning the value to a list for the main program to use.

By providing the list notes_pressed as an argument to get_next_note(), it ensures that the list is updated when return_pressed_notes() is called.

Here is the code I used to solve my issue:

from Flask import Flask
import mido
from threading import Thread

app = Flask(__name__)

# List to store pressed keys
notes_pressed = []

@app.route('/get_notes', methods=['GET'])
def return_pressed_notes():
return json.dumps(notes_pressed)

# Function to translate midi key numbers to note letters
def translate_key(key_num):
...

# Function that returns recently played note
def get_next_note(notes_pressed):
# Open port to listen for note presses
with mido.open_input() as inport:
# Retreive key presses from port
for msg in inport:
# If key press is valid
# - note is pressed
# - velocity!=0 (prevents ghost notes)
if (msg.type=='note_on' and msg.velocity!=0 and msg.channel==0):
# Add new note to list
notes_pressed.append(translate_key(msg.note - lowest_key))

# Run main program
if __name__ == '__main__':
# NEW CODE
p = Thread(target=get_next_note, args=(notes_pressed,))
p.start()
app.run(debug=True, use_reloader=False)
p.join()

Python Flask background threading is not working: code is not being executed

As discussed in comments, here's an oversimplified example of an event-driven system with RabbitMQ and Flask.

Dependencies you need:

(flask-rabbitmq) ➜  flask-rabbitmq pip freeze
click==8.0.3
Flask==2.0.2
itsdangerous==2.0.1
Jinja2==3.0.3
MarkupSafe==2.0.1
pika==1.2.0
Werkzeug==2.0.2

Try to create a RabbitMQ docker container with the command below:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

your simple flask app would look like this:

from flask import Flask
from flask import request
import json
from RabbitMQPublisher import publish

app = Flask(__name__)

@app.route('/publish', methods=['POST'])
def index():
if request.method == 'POST':
publish(json.dumps(request.json))
return 'Done'


if __name__ == '__main__':
app.run()

your RabbitMQPublisher.py would be like this:

import pika

def publish(data: str):
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='0.0.0.0', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test', exchange_type='fanout')
channel.queue_declare(queue='', exclusive=True)
channel.basic_publish(exchange='test', routing_key='', body='{"data": %s}'%(data))
connection.close()

and finally your script.py would be like this:

import pika
import json
from time import sleep

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='0.0.0.0', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)

channel.queue_bind(exchange='test', queue='')

def callback(ch, method, properties, body):
body = json.loads(body.decode().replace("'", '"'))
print(body)


channel.basic_consume(
queue='', on_message_callback=callback, auto_ack=True)

channel.start_consuming()

inside the callback function in the code above, you can specify you're logic and when your logic finishes you can again use RabbitMQ to call a pattern on flask side or event do http call with requests. that would be your choice.



Related Topics



Leave a reply



Submit