How to Add a Timeout to a Function in Python

Timeout on a function call

You may use the signal package if you are running on UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 seconds after the call signal.alarm(10), the handler is called. This raises an exception that you can intercept from the regular Python code.

This module doesn't play well with threads (but then, who does?)

Note that since we raise an exception when timeout happens, it may end up caught and ignored inside the function, for example of one such function:

def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue

How to add a timeout to a function in Python

This question was asked over 9 years ago, and Python has changed a decent amount since then as has my repertoire of experience. After reviewing other APIs in the standard library and wanting to partially replicate one in particular, the follow module was written to serve a similar purpose as the one posted in the question.

asynchronous.py

#! /usr/bin/env python3
import _thread
import abc as _abc
import collections as _collections
import enum as _enum
import math as _math
import multiprocessing as _multiprocessing
import operator as _operator
import queue as _queue
import signal as _signal
import sys as _sys
import time as _time

__all__ = (
'Executor',
'get_timeout',
'set_timeout',
'submit',
'map_',
'shutdown'
)

class _Base(metaclass=_abc.ABCMeta):
__slots__ = (
'__timeout',
)

@_abc.abstractmethod
def __init__(self, timeout):
self.timeout = _math.inf if timeout is None else timeout

def get_timeout(self):
return self.__timeout

def set_timeout(self, value):
if not isinstance(value, (float, int)):
raise TypeError('value must be of type float or int')
if value <= 0:
raise ValueError('value must be greater than zero')
self.__timeout = value

timeout = property(get_timeout, set_timeout)

def _run_and_catch(fn, args, kwargs):
# noinspection PyPep8,PyBroadException
try:
return False, fn(*args, **kwargs)
except:
return True, _sys.exc_info()[1]

def _run(fn, args, kwargs, queue):
queue.put_nowait(_run_and_catch(fn, args, kwargs))

class _State(_enum.IntEnum):
PENDING = _enum.auto()
RUNNING = _enum.auto()
CANCELLED = _enum.auto()
FINISHED = _enum.auto()
ERROR = _enum.auto()

def _run_and_catch_loop(iterable, *args, **kwargs):
exception = None
for fn in iterable:
error, value = _run_and_catch(fn, args, kwargs)
if error:
exception = value
if exception:
raise exception

class _Future(_Base):
__slots__ = (
'__queue',
'__process',
'__start_time',
'__callbacks',
'__result',
'__mutex'
)

def __init__(self, timeout, fn, args, kwargs):
super().__init__(timeout)
self.__queue = _multiprocessing.Queue(1)
self.__process = _multiprocessing.Process(
target=_run,
args=(fn, args, kwargs, self.__queue),
daemon=True
)
self.__start_time = _math.inf
self.__callbacks = _collections.deque()
self.__result = True, TimeoutError()
self.__mutex = _thread.allocate_lock()

@property
def __state(self):
pid, exitcode = self.__process.pid, self.__process.exitcode
return (_State.PENDING if pid is None else
_State.RUNNING if exitcode is None else
_State.CANCELLED if exitcode == -_signal.SIGTERM else
_State.FINISHED if exitcode == 0 else
_State.ERROR)

def __repr__(self):
root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
if self.__state < _State.CANCELLED:
return f'<{root}>'
error, value = self.__result
suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
return f'<{root} {suffix}>'

def __consume_callbacks(self):
while self.__callbacks:
yield self.__callbacks.popleft()

def __invoke_callbacks(self):
self.__process.join()
_run_and_catch_loop(self.__consume_callbacks(), self)

def cancel(self):
self.__process.terminate()
self.__invoke_callbacks()

def __auto_cancel(self):
elapsed_time = _time.perf_counter() - self.__start_time
if elapsed_time > self.timeout:
self.cancel()
return elapsed_time

def cancelled(self):
self.__auto_cancel()
return self.__state is _State.CANCELLED

def running(self):
self.__auto_cancel()
return self.__state is _State.RUNNING

def done(self):
self.__auto_cancel()
return self.__state > _State.RUNNING

def __handle_result(self, error, value):
self.__result = error, value
self.__invoke_callbacks()

def __ensure_termination(self):
with self.__mutex:
elapsed_time = self.__auto_cancel()
if not self.__queue.empty():
self.__handle_result(*self.__queue.get_nowait())
elif self.__state < _State.CANCELLED:
remaining_time = self.timeout - elapsed_time
if remaining_time == _math.inf:
remaining_time = None
try:
result = self.__queue.get(True, remaining_time)
except _queue.Empty:
self.cancel()
else:
self.__handle_result(*result)

def result(self):
self.__ensure_termination()
error, value = self.__result
if error:
raise value
return value

def exception(self):
self.__ensure_termination()
error, value = self.__result
if error:
return value

def add_done_callback(self, fn):
if self.done():
fn(self)
else:
self.__callbacks.append(fn)

def _set_running_or_notify_cancel(self):
if self.__state is _State.PENDING:
self.__process.start()
self.__start_time = _time.perf_counter()
else:
self.cancel()

class Executor(_Base):
__slots__ = (
'__futures',
)

def __init__(self, timeout=None):
super().__init__(timeout)
self.__futures = set()

def submit(self, fn, *args, **kwargs):
future = _Future(self.timeout, fn, args, kwargs)
self.__futures.add(future)
future.add_done_callback(self.__futures.remove)
# noinspection PyProtectedMember
future._set_running_or_notify_cancel()
return future

@staticmethod
def __cancel_futures(iterable):
_run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))

def map(self, fn, *iterables):
futures = tuple(self.submit(fn, *args) for args in zip(*iterables))

def result_iterator():
future_iterator = iter(futures)
try:
for future in future_iterator:
yield future.result()
finally:
self.__cancel_futures(future_iterator)

return result_iterator()

def shutdown(self):
self.__cancel_futures(frozenset(self.__futures))

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
return False

_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor

Timeout function if it takes too long to finish

The process for timing out an operations is described in the documentation for signal.

The basic idea is to use signal handlers to set an alarm for some time interval and raise an exception once that timer expires.

Note that this will only work on UNIX.

Here's an implementation that creates a decorator (save the following code as timeout.py).

import errno
import os
import signal
import functools

class TimeoutError(Exception):
pass

def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)

@functools.wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result

return wrapper

return decorator

This creates a decorator called @timeout that can be applied to any long running functions.

So, in your application code, you can use the decorator like so:

from timeout import timeout

# Timeout a long running function with the default expiry of 10 seconds.
@timeout
def long_running_function1():
...

# Timeout after 5 seconds
@timeout(5)
def long_running_function2():
...

# Timeout after 30 seconds, with the error "Connection timed out"
@timeout(30, os.strerror(errno.ETIMEDOUT))
def long_running_function3():
...

Simplest way to timeout a Python function on Windows

depends on use case ... for this very specific example

import time

def hello(timeout=5):
start_time = time.time()
for _ in range(10):
if time.time()-start_time > timeout:
break
print("hello")
time.sleep(1)

hello()

is one way you could do it

alternatively you could use multiprocessing

import multiprocessing
...
if __name__ == "__main__":
proc = multiprocessing.Process(target=hello)
proc.start()
time.sleep(5)
proc.terminate()

How can I implement a timeout function?

Use signal.alarm, following the example given in the documentation of the signal module.

class TimesUpError(RuntimeException):
pass

def handler(signum, frame):
raise TimesUpError

signal.signal(signal.SIGALRM, handler)
signal.alarm(15)
try:
while SOME_CONDITION:
time.sleep(1)
except TimesUpError:
print("Times up, continuing")
finally:
signal.alarm(0)

Depending on what SOME_CONDITION is, you may be able to use signal.sigtimedwait instead of the loop.

Timeout a function (windows)?

I think a good way to approach this is to create a decorator and use the Thread.join(timeout=seconds) method. Bear in mind that there's no good way to kill the thread, so it will continue to run in the background, more or less, as long as your program is running.

First, create a decorator like this:

from threading import Thread
import functools

def timeout(timeout):
def deco(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
res = [Exception('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, timeout))]
def newFunc():
try:
res[0] = func(*args, **kwargs)
except Exception as e:
res[0] = e
t = Thread(target=newFunc)
t.daemon = True
try:
t.start()
t.join(timeout)
except Exception as je:
print ('error starting thread')
raise je
ret = res[0]
if isinstance(ret, BaseException):
raise ret
return ret
return wrapper
return deco

Then, do something like this:

func = timeout(timeout=16)(MyModule.MyFunc)
try:
func()
except:
pass #handle errors here

You can use this decorator anywhere you need with something like:

@timeout(60)
def f():
...


Related Topics



Leave a reply



Submit