How to Run a Function Periodically in Python

Executing periodic actions

At the end of foo(), create a Timer which calls foo() itself after 10 seconds.

Because, Timer create a new thread to call foo().

You can do other stuff without being blocked.

import time, threading
def foo():
print(time.ctime())
threading.Timer(10, foo).start()

foo()

#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011

How to repeatedly execute a function every x seconds?

If your program doesn't have a event loop already, use the sched module, which implements a general purpose event scheduler.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print("Doing stuff...")
# do your stuff
sc.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

If you're already using an event loop library like asyncio, trio, tkinter, PyQt5, gobject, kivy, and many others - just schedule the task using your existing event loop library's methods, instead.

Execute a function periodically in python for every n milliseconds in python 2

I tried it on my own and it works, but it seems that Python isn't that fast in the console. In my case it just gets to about 60 runs per second.

import threading
import time

def hello(*args):
print(str(args[0])+" It's "+str(time.ctime()))
next=int(args[0])+1
threading.Timer(0.001, hello,[str(next)]).start()

hello("1")

Periodically execute function in thread in real time, every N seconds

The simple solution

import threading

def work ():
threading.Timer(0.25, work).start ()
print "stackoverflow"

work ()

The above will make sure that work is run with an interval of four times per second, the theory behind this is that it will "queue" a call to itself that will be run 0.25 seconds into the future, without hanging around waiting for that to happen.

Because of this it can do it's work (almost) entirely uninterrupted, and we are extremely close to executing the function exactly 4 times per second.


More about threading.Timer can be read by following the below link to the python documentation:

  • docs.python.org - 16.2.7. Timer Objects


RECOMMENDED] The more advanced/dynamic solution

Even though the previous function works as expected you could create a helper function to aid in dealing with future timed events.

Something as the below will be sufficient for this example, hopefully the code will speak for itself - it is not as advanced as it might appear.

See this as an inspiration when you might implement your own wrapper to fit your exact needs.

import threading

def do_every (interval, worker_func, iterations = 0):
if iterations != 1:
threading.Timer (
interval,
do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
).start ()

worker_func ()

def print_hw ():
print "hello world"

def print_so ():
print "stackoverflow"

# call print_so every second, 5 times total
do_every (1, print_so, 5)

# call print_hw two times per second, forever
do_every (0.5, print_hw)


How to periodically call a function using multiprocessing in Python?

Basically, there is no timer class in the multiprocessing module. It should be, but seems like that when they migrated from pyprocessing to multiprocessing they didn't include that part. It's explained here https://stackoverflow.com/a/25297863/6598433.

You can make a working timer for the multiprocessing library manually as dano posted in https://stackoverflow.com/a/25297758/6598433

from multiprocessing import Process, Event

class Timer(Process):
def __init__(self, interval, function, args=[], kwargs={}):
super(Timer, self).__init__()
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()

def cancel(self):
"""Stop the timer if it hasn't finished yet"""
self.finished.set()

def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()

Check here the full question: Why no Timer class in Python's multiprocessing module?

How to make a function run for specific duration periodically

I would suggest utilizing the Advanced Python Scheduler and more specifically, use their interval scheduler, for example:

sched = BlockingScheduler()
sched.add_job(yourFunction, 'interval', seconds=10)
sched.start()

EDIT
Here's a more complete example:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def myFunction(testParam):
print("Message: {}".format(testParam))

if __name__ == '__main__':
sched.add_job(myFunction, 'interval', seconds=10, args=["Works!"])
sched.start()


Related Topics



Leave a reply



Submit