Timeout on a function call
You may use the signal package if you are running on UNIX:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
10 seconds after the call signal.alarm(10)
, the handler is called. This raises an exception that you can intercept from the regular Python code.
This module doesn't play well with threads (but then, who does?)
Note that since we raise an exception when timeout happens, it may end up caught and ignored inside the function, for example of one such function:
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
How to add a timeout to a function in Python
This question was asked over 9 years ago, and Python has changed a decent amount since then as has my repertoire of experience. After reviewing other APIs in the standard library and wanting to partially replicate one in particular, the follow module was written to serve a similar purpose as the one posted in the question.
asynchronous.py
#! /usr/bin/env python3
import _thread
import abc as _abc
import collections as _collections
import enum as _enum
import math as _math
import multiprocessing as _multiprocessing
import operator as _operator
import queue as _queue
import signal as _signal
import sys as _sys
import time as _time
__all__ = (
'Executor',
'get_timeout',
'set_timeout',
'submit',
'map_',
'shutdown'
)
class _Base(metaclass=_abc.ABCMeta):
__slots__ = (
'__timeout',
)
@_abc.abstractmethod
def __init__(self, timeout):
self.timeout = _math.inf if timeout is None else timeout
def get_timeout(self):
return self.__timeout
def set_timeout(self, value):
if not isinstance(value, (float, int)):
raise TypeError('value must be of type float or int')
if value <= 0:
raise ValueError('value must be greater than zero')
self.__timeout = value
timeout = property(get_timeout, set_timeout)
def _run_and_catch(fn, args, kwargs):
# noinspection PyPep8,PyBroadException
try:
return False, fn(*args, **kwargs)
except:
return True, _sys.exc_info()[1]
def _run(fn, args, kwargs, queue):
queue.put_nowait(_run_and_catch(fn, args, kwargs))
class _State(_enum.IntEnum):
PENDING = _enum.auto()
RUNNING = _enum.auto()
CANCELLED = _enum.auto()
FINISHED = _enum.auto()
ERROR = _enum.auto()
def _run_and_catch_loop(iterable, *args, **kwargs):
exception = None
for fn in iterable:
error, value = _run_and_catch(fn, args, kwargs)
if error:
exception = value
if exception:
raise exception
class _Future(_Base):
__slots__ = (
'__queue',
'__process',
'__start_time',
'__callbacks',
'__result',
'__mutex'
)
def __init__(self, timeout, fn, args, kwargs):
super().__init__(timeout)
self.__queue = _multiprocessing.Queue(1)
self.__process = _multiprocessing.Process(
target=_run,
args=(fn, args, kwargs, self.__queue),
daemon=True
)
self.__start_time = _math.inf
self.__callbacks = _collections.deque()
self.__result = True, TimeoutError()
self.__mutex = _thread.allocate_lock()
@property
def __state(self):
pid, exitcode = self.__process.pid, self.__process.exitcode
return (_State.PENDING if pid is None else
_State.RUNNING if exitcode is None else
_State.CANCELLED if exitcode == -_signal.SIGTERM else
_State.FINISHED if exitcode == 0 else
_State.ERROR)
def __repr__(self):
root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
if self.__state < _State.CANCELLED:
return f'<{root}>'
error, value = self.__result
suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
return f'<{root} {suffix}>'
def __consume_callbacks(self):
while self.__callbacks:
yield self.__callbacks.popleft()
def __invoke_callbacks(self):
self.__process.join()
_run_and_catch_loop(self.__consume_callbacks(), self)
def cancel(self):
self.__process.terminate()
self.__invoke_callbacks()
def __auto_cancel(self):
elapsed_time = _time.perf_counter() - self.__start_time
if elapsed_time > self.timeout:
self.cancel()
return elapsed_time
def cancelled(self):
self.__auto_cancel()
return self.__state is _State.CANCELLED
def running(self):
self.__auto_cancel()
return self.__state is _State.RUNNING
def done(self):
self.__auto_cancel()
return self.__state > _State.RUNNING
def __handle_result(self, error, value):
self.__result = error, value
self.__invoke_callbacks()
def __ensure_termination(self):
with self.__mutex:
elapsed_time = self.__auto_cancel()
if not self.__queue.empty():
self.__handle_result(*self.__queue.get_nowait())
elif self.__state < _State.CANCELLED:
remaining_time = self.timeout - elapsed_time
if remaining_time == _math.inf:
remaining_time = None
try:
result = self.__queue.get(True, remaining_time)
except _queue.Empty:
self.cancel()
else:
self.__handle_result(*result)
def result(self):
self.__ensure_termination()
error, value = self.__result
if error:
raise value
return value
def exception(self):
self.__ensure_termination()
error, value = self.__result
if error:
return value
def add_done_callback(self, fn):
if self.done():
fn(self)
else:
self.__callbacks.append(fn)
def _set_running_or_notify_cancel(self):
if self.__state is _State.PENDING:
self.__process.start()
self.__start_time = _time.perf_counter()
else:
self.cancel()
class Executor(_Base):
__slots__ = (
'__futures',
)
def __init__(self, timeout=None):
super().__init__(timeout)
self.__futures = set()
def submit(self, fn, *args, **kwargs):
future = _Future(self.timeout, fn, args, kwargs)
self.__futures.add(future)
future.add_done_callback(self.__futures.remove)
# noinspection PyProtectedMember
future._set_running_or_notify_cancel()
return future
@staticmethod
def __cancel_futures(iterable):
_run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))
def map(self, fn, *iterables):
futures = tuple(self.submit(fn, *args) for args in zip(*iterables))
def result_iterator():
future_iterator = iter(futures)
try:
for future in future_iterator:
yield future.result()
finally:
self.__cancel_futures(future_iterator)
return result_iterator()
def shutdown(self):
self.__cancel_futures(frozenset(self.__futures))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
return False
_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor
Timeout function if it takes too long to finish
The process for timing out an operations is described in the documentation for signal.
The basic idea is to use signal handlers to set an alarm for some time interval and raise an exception once that timer expires.
Note that this will only work on UNIX.
Here's an implementation that creates a decorator (save the following code as timeout.py
).
import errno
import os
import signal
import functools
class TimeoutError(Exception):
pass
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
@functools.wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
This creates a decorator called @timeout
that can be applied to any long running functions.
So, in your application code, you can use the decorator like so:
from timeout import timeout
# Timeout a long running function with the default expiry of 10 seconds.
@timeout
def long_running_function1():
...
# Timeout after 5 seconds
@timeout(5)
def long_running_function2():
...
# Timeout after 30 seconds, with the error "Connection timed out"
@timeout(30, os.strerror(errno.ETIMEDOUT))
def long_running_function3():
...
Simplest way to timeout a Python function on Windows
depends on use case ... for this very specific example
import time
def hello(timeout=5):
start_time = time.time()
for _ in range(10):
if time.time()-start_time > timeout:
break
print("hello")
time.sleep(1)
hello()
is one way you could do it
alternatively you could use multiprocessing
import multiprocessing
...
if __name__ == "__main__":
proc = multiprocessing.Process(target=hello)
proc.start()
time.sleep(5)
proc.terminate()
How can I implement a timeout function?
Use signal.alarm
, following the example given in the documentation of the signal
module.
class TimesUpError(RuntimeException):
pass
def handler(signum, frame):
raise TimesUpError
signal.signal(signal.SIGALRM, handler)
signal.alarm(15)
try:
while SOME_CONDITION:
time.sleep(1)
except TimesUpError:
print("Times up, continuing")
finally:
signal.alarm(0)
Depending on what SOME_CONDITION
is, you may be able to use signal.sigtimedwait
instead of the loop.
Timeout a function (windows)?
I think a good way to approach this is to create a decorator and use the Thread.join(timeout=seconds)
method. Bear in mind that there's no good way to kill the thread, so it will continue to run in the background, more or less, as long as your program is running.
First, create a decorator like this:
from threading import Thread
import functools
def timeout(timeout):
def deco(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
res = [Exception('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, timeout))]
def newFunc():
try:
res[0] = func(*args, **kwargs)
except Exception as e:
res[0] = e
t = Thread(target=newFunc)
t.daemon = True
try:
t.start()
t.join(timeout)
except Exception as je:
print ('error starting thread')
raise je
ret = res[0]
if isinstance(ret, BaseException):
raise ret
return ret
return wrapper
return deco
Then, do something like this:
func = timeout(timeout=16)(MyModule.MyFunc)
try:
func()
except:
pass #handle errors here
You can use this decorator anywhere you need with something like:
@timeout(60)
def f():
...
Related Topics
Groupby Column and Filter Rows with Maximum Value in Pyspark
Is There a Difference Between Using a Dict Literal and a Dict Constructor
Too Many Values to Unpack Calling Cv2.Findcontours
How to Merge Images into a Canvas Using Pil/Pillow
Why Does Pandas Apply Calculate Twice
Using the Multiprocessing Module for Cluster Computing
Why Is the Time Complexity of Python's List.Append() Method O(1)
Python: Start New Command Prompt on Windows and Wait for It Finish/Exit
How to Return a String from a Regex Match in Python
Tuple or List When Using 'In' in an 'If' Clause
Cleanest Way to Hide Every Nth Tick Label in Matplotlib Colorbar
Python Command Line Input in a Process
Read a File Line by Line from S3 Using Boto
How to Normalize JSON Correctly by Python Pandas
Find Out How Many Times a Regex Matches in a String in Python