Python3 Flask Asyncio Subprocess in Route Hangs

Python3 Flask asyncio subprocess in route hangs

There are several concerns here:

  • You are creating a new event loop on import, once, but close the event loop in your view. There is no need to close the loop, at all, because a second request will now fail because the loop is closed.

  • The asyncio event loop is not thread safe, and should not be shared between threads. The vast majority of Flask deployments will use threads to handle incoming requests. Your code carries echoes of how this should be handled instead but unfortunately it is not the correct approach. E.g. asyncio.get_child_watcher().attach_loop(eventLoop) is mostly redundant because eventLoop = asyncio.new_event_loop(), if run on the main thread, already does exactly that.

    This is the main candidate for the issues you are seeing.

  • Your code assumes that the executable is in fact present and executable. You should be handling OSError exceptions (and subclasses), because an unqualified s.py would only work if it is made executable, starts with a #! shebang line and is found on the PATH. It won't work just because it is in the same directory, nor would you want to rely on the current working directory anyway.

  • Your code assumes that the process closes stdout at some point. If the subprocess never closes stdout (something that happens automatically when the process exits) then your async for line in process.stdout: loop will wait forever too. Consider adding timeouts to the code to avoid getting blocked on a faulty subprocess.

There are two sections in the Python asyncio documentation that you really would want to read when using asyncio subprocesses in a multi-threaded application:

  • The Concurrency and Multithreading section, explaining that Almost all asyncio objects are not thread safe. You don't want to add tasks to the loop from other threads directly; you want to either use an event loop per thread, or use the asyncio.run_coroutine_threadsafe() function to run a coroutine on a loop in a specific thread.

  • For Python versions up to 3.7, you also need to read the Subprocess and Threads section, because up until that version asyncio uses a non-blocking os.waitpid(-1, os.WNOHANG) call to track child state and relies on using signal handling (which can only be done on the main thread). Python 3.8 removed this restriction (by adding a new child watcher implementation that uses a blocking per-process os.waitpid() call in a separate thread, at the expense of extra memory.

    You don't have to stick to the default child watcher strategy, however. You can use EventLoopPolicy.set_child_watcher() and passing in a different process watcher instance. In practice that means backporting the 3.8 ThreadedChildWatcher implementation.

For your use case, there really no need to need to run a new event loop per thread. Run a single loop, in a separate thread as needed. If you use a loop in a separate thread, depending on your Python version, you may need to have a running loop on the main thread as well or use a different process watcher. Generally speaking, running an asyncio loop on the main thread in a WSGI server is not going to be easy or even possible.

So you need to run a loop, permanently, in a separate thread, and you need to use a child process watcher that works without a main thread loop. Here is an implementation for just that, and this should work for Python versions 3.6 and newer:

import asyncio
import itertools
import logging
import time
import threading

try:
# Python 3.8 or newer has a suitable process watcher
asyncio.ThreadedChildWatcher
except AttributeError:
# backport the Python 3.8 threaded child watcher
import os
import warnings

# Python 3.7 preferred API
_get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)

class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
def __init__(self):
self._pid_counter = itertools.count(0)
self._threads = {}

def is_active(self):
return True

def close(self):
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def __del__(self, _warn=warnings.warn):
threads = [t for t in list(self._threads.values()) if t.is_alive()]
if threads:
_warn(
f"{self.__class__} has registered but not finished child processes",
ResourceWarning,
source=self,
)

def add_child_handler(self, pid, callback, *args):
loop = _get_running_loop()
thread = threading.Thread(
target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True,
)
self._threads[pid] = thread
thread.start()

def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base class requires it
return True

def attach_loop(self, loop):
pass

def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0

try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255", pid
)
else:
if os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
elif os.WIFEXITED(status):
returncode = os.WEXITSTATUS(status)
else:
returncode = status

if loop.get_debug():
logger.debug(
"process %s exited with returncode %s", expected_pid, returncode
)

if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)

self._threads.pop(expected_pid)

# add the watcher to the loop policy
asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

logger = logging.getLogger(__name__)

class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)

def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)

def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)

def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)

def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
global _loop_thread
if _loop_thread is None:
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
# give the thread up to a second to produce a loop
deadline = time.time() + 1
while not _loop_thread.loop and time.time() < deadline:
time.sleep(0.001)

return _loop_thread.loop

def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None

def run_coroutine(coro):
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

The above is the same general 'run async with Flask' solution as I posted for Make a Python asyncio call from a Flask route, but with the addition of the ThreadedChildWatcher backport.

You can then use the loop returned from get_event_loop() to run child processes, by calling run_coroutine_threadsafe():

import asyncio
import locale
import logging

logger = logging.getLogger(__name__)

def get_command_output(cmd, timeout=None):
encoding = locale.getpreferredencoding(False)

async def run_async():
try:
process = await asyncio.create_subprocess_exec(
cmd, stdout=asyncio.subprocess.PIPE)
except OSError:
logging.exception("Process %s could not be started", cmd)
return

async for line in process.stdout:
line = line.decode(encoding)
# TODO: actually do something with the data.
print(line, flush=True)

process.kill()
logging.debug("Process for %s exiting with %i", cmd, process.returncode)

return await process.wait()

future = run_coroutine(run_async())
result = None
try:
result = future.result(timeout)
except asyncio.TimeoutError:
logger.warn('The child process took too long, cancelling the task...')
future.cancel()
except Exception as exc:
logger.exception(f'The child process raised an exception')
return result

Note that the above function can take a timeout, in seconds, the maximum amount of time you'll wait for the subprocess to complete.

Asyncio function works when called from script but not from Flask route

You are trying to run your async subprocess from a thread other than the main thread. This requires some initial setup from the main thread, see the Subprocesses and Threads section of the asyncio Subprocesses documentation:

Standard asyncio event loop supports running subprocesses from different threads, but there are limitations:

  • An event loop must run in the main thread.
  • The child watcher must be instantiated in the main thread before executing subprocesses from other threads. Call the get_child_watcher() function in the main thread to instantiate the child watcher.

What is happening here is that your WSGI server is using multiple threads to handle incoming requests, so the request handler is not running on the main thread. But your code uses asyncio.run() to start a new event loop, and so your asyncio.create_subprocess_exec() call will fail as there is no child watcher on the main thread.

You'd have to start a loop (and not stop it) from the main thread, and call asyncio.get_child_watcher() on that thread, for your code not to fail:

# to be run on the main thread, set up a subprocess child watcher
assert threading.current_thread() is threading.main_thread()
asyncio.get_event_loop()
asyncio.get_child_watcher()

Note: this restriction only applies to Python versions up to Python 3.7, the restriction has been lifted in Python 3.8.

However, just to run a bunch of subprocesses and wait for these to complete, using asyncio is overkill; your OS can run subprocesses in parallel just fine. Just use subprocess.Popen() and check each process via the Popen.poll() method:

import subprocess

def ping_proc(addr):
return subprocess.Popen(
['ping', '-W', '1', '-c', '3', addr],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)

def get_hosts(net):
# hosts() returns list of Ipv4Address objects
procs = [ping_proc(str(addr)) for addr in net.hosts()]
while any(p.poll() is None for p in procs):
time.sleep(0.1)
return [p.returncode for p in procs]

Popen.poll() does not block; if Popen.returncode is not yet set it checks for the process status with the OS with waitpid([pid], WNOHANG) and returns either None if the process is still running, or the now-available returncode value. The above just checks for those statuses in a loop with a short sleep in between to avoid thrashing.

The asyncio subprocess wrapper (on POSIX at least) either uses a SIGCHLD signal handler to be notified of child processes exiting or (in Python 3.8) uses a separate thread per child process to use a blocking waitpid() call on each subprocess created. You could implement the same signal handler, but take into account that signal handlers can only be registered on the main thread, so you'd have to jump through several hoops to communicate incoming SIGCHLD signal information to the right thread.

Starting and stopping a method using flask routes (Python)

Here is a working example. When you call run it will start a 1 second loop that prints "running...". Then when you call stop it modifies a global variable that the threaded function checks for the loop and stops.

You should be able to paste and go, then just modify for your purposes from there.

from flask import Flask, Response
from threading import Thread
from time import sleep

app = Flask(__name__)

stop_run = False

def my_function():
global stop_run
while not stop_run:
sleep(1)
print("running...")

def manual_run():
t = Thread(target=my_function)
t.start()
return "Processing"

@app.route("/stop", methods=['GET'])
def set_stop_run():
global stop_run
stop_run = True
return "Application stopped"

@app.route("/run", methods=['GET'])
def run_process():
global stop_run
stop_run = False
return Response(manual_run(), mimetype="text/html")

if __name__ == "__main__":
app.run()

How can I run an external command asynchronously from Python?

subprocess.Popen does exactly what you want.

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(Edit to complete the answer from comments)

The Popen instance can do various other things like you can poll() it to see if it is still running, and you can communicate() with it to send it data on stdin, and wait for it to terminate.

How to terminate a python subprocess launched with shell=True

Use a process group so as to enable sending a signal to all the process in the groups. For that, you should attach a session id to the parent process of the spawned/child processes, which is a shell in your case. This will make it the group leader of the processes. So now, when a signal is sent to the process group leader, it's transmitted to all of the child processes of this group.

Here's the code:

import os
import signal
import subprocess

# The os.setsid() is passed in the argument preexec_fn so
# it's run after the fork() and before exec() to run the shell.
pro = subprocess.Popen(cmd, stdout=subprocess.PIPE,
shell=True, preexec_fn=os.setsid)

os.killpg(os.getpgid(pro.pid), signal.SIGTERM) # Send the signal to all the process groups


Related Topics



Leave a reply



Submit