How to start and stop a thread
This is David Heffernan's idea fleshed-out. The example below runs for 1 second, then stops for 1 second, then runs for 1 second, and so on.
import time
import threading
import datetime as DT
import logging
logger = logging.getLogger(__name__)
def worker(cond):
i = 0
while True:
with cond:
cond.wait()
logger.info(i)
time.sleep(0.01)
i += 1
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
cond = threading.Condition()
t = threading.Thread(target=worker, args=(cond, ))
t.daemon = True
t.start()
start = DT.datetime.now()
while True:
now = DT.datetime.now()
if (now-start).total_seconds() > 60: break
if now.second % 2:
with cond:
cond.notify()
How to start/stop/restart a thread in Java?
Once a thread stops you cannot restart it. However, there is nothing stopping you from creating and starting a new thread.
Option 1: Create a new thread rather than trying to restart.
Option 2: Instead of letting the thread stop, have it wait and then when it receives notification you can allow it to do work again. This way the thread never stops and will never need to be restarted.
Edit based on comment:
To "kill" the thread you can do something like the following.
yourThread.setIsTerminating(true); // tell the thread to stop
yourThread.join(); // wait for the thread to stop
C# Threading - How to start and stop a thread
This is how I do it...
public class ThreadA {
public ThreadA(object[] args) {
...
}
public void Run() {
while (true) {
Thread.sleep(1000); // wait 1 second for something to happen.
doStuff();
if(conditionToExitReceived) // what im waiting for...
break;
}
//perform cleanup if there is any...
}
}
Then to run this in its own thread... ( I do it this way because I also want to send args to the thread)
private void FireThread(){
Thread thread = new Thread(new ThreadStart(this.startThread));
thread.start();
}
private void (startThread){
new ThreadA(args).Run();
}
The thread is created by calling "FireThread()"
The newly created thread will run until its condition to stop is met, then it dies...
You can signal the "main" with delegates, to tell it when the thread has died.. so you can then start the second one...
Best to read through : This MSDN Article
Java Start and Stop Threads
For anyone who might use TarsosDSP
in the future and faces the same problem then the easiest way is to make a global dispatcher
dispatcher = AudioDispatcherFactory.fromDefaultMicrophone(22050,1024,0);
And simply stop
the Thread
with this:
dispatcher.stop();
Is there any way to kill a Thread?
It is generally a bad pattern to kill a thread abruptly, in Python, and in any language. Think of the following cases:
- the thread is holding a critical resource that must be closed properly
- the thread has created several other threads that must be killed as well.
The nice way of handling this, if you can afford it (if you are managing your own threads), is to have an exit_request flag that each thread checks on a regular interval to see if it is time for it to exit.
For example:
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
In this code, you should call stop()
on the thread when you want it to exit, and wait for the thread to exit properly using join()
. The thread should check the stop flag at regular intervals.
There are cases, however, when you really need to kill a thread. An example is when you are wrapping an external library that is busy for long calls, and you want to interrupt it.
The following code allows (with some restrictions) to raise an Exception in a Python thread:
def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadWithExc(threading.Thread):
'''A thread class that supports raising an exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id
CAREFUL: this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")
# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id
# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid
# TODO: in python 2.6, there's a simpler way to do: self.ident
raise AssertionError("could not determine the thread's id")
def raiseExc(self, exctype):
"""Raises the given exception type in the context of this thread.
If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.
If you are sure that your exception should terminate the thread,
one way to ensure that it works is:
t = ThreadWithExc( ... )
...
t.raiseExc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raiseExc( SomeException )
If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.
CAREFUL: this function is executed in the context of the
caller thread, to raise an exception in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )
(Based on Killable Threads by Tomer Filiba. The quote about the return value of PyThreadState_SetAsyncExc
appears to be from an old version of Python.)
As noted in the documentation, this is not a magic bullet because if the thread is busy outside the Python interpreter, it will not catch the interruption.
A good usage pattern of this code is to have the thread catch a specific exception and perform the cleanup. That way, you can interrupt a task and still have proper cleanup.
Start/stop thread
Java operates on a cooperative interrupt model for stopping threads. That means you can't simply stop a thread mid-execution without cooperation from the thread itself. If you want to stop a thread the client can call Thread.interrupt() method to request the thread stop:
public class SomeBackgroundProcess implements Runnable {
Thread backgroundThread;
public void start() {
if( backgroundThread == null ) {
backgroundThread = new Thread( this );
backgroundThread.start();
}
}
public void stop() {
if( backgroundThread != null ) {
backgroundThread.interrupt();
}
}
public void run() {
try {
Log.i("Thread starting.");
while( !backgroundThread.interrupted() ) {
doSomething();
}
Log.i("Thread stopping.");
} catch( InterruptedException ex ) {
// important you respond to the InterruptedException and stop processing
// when its thrown! Notice this is outside the while loop.
Log.i("Thread shutting down as it was requested to stop.");
} finally {
backgroundThread = null;
}
}
The important part of threading is that you don't swallow InterruptedException and instead stop your thread's loop and shutdown because you only get this exception if a client has request the thread interrupt itself.
So you simply need to hook up the SomeBackgroundProcess.start() to the event for unlock, and hook up the SomeBackgroundProcess.stop() to the lock event.
How to start and stop a worker thread
In short, the T
in join()
on a JoinHandle<T>
returns the result of the closure passed to thread::spawn()
. So in your case JoinHandle<?>
would need to be JoinHandle<()>
as your closure returns nothing, i.e. ()
(unit).
Other than that, your dummy code contains a few additional issues.
- The return type of
run()
is incorrect, and would need to at least beResult<(), ()>
. - The
thread
field would need to beOption<JoinHandle<()>
to be able to handlefn stop(&mut self)
asjoin()
consumes theJoinHandle
. - However, you're attempting to pass
&mut self
to the closure, which brings a lot more issues, boiling down to multiple mutable references- This could be solved with e.g.
Mutex<A>
. However, if you callstop()
then that could lead to a deadlock instead.
- This could be solved with e.g.
However, since it was dummy code, and you clarified in the comments. Let me try and clarify what you meant with a few examples. This includes me rewriting your dummy code.
Result after worker is done
If you don't need access to the data while the worker thread is running, then you can make a new struct WorkerData
. Then in run()
you copy/clone the data you need from A
(or as I've renamed it Worker
). Then in the closure you finally return data
again, so you can acquire it through join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
You don't really need thread
to be Option<JoinHandle<WorkerData>>
and instead could just use JoinHandle<WorkerData>>
. Because if you wanted to call run()
again, it would just be easier to reassign the variable holding the Worker
.
So now we can simplify Worker
, removing the Option
and change stop
to consume thread
instead, along with creating new() -> Self
in place of run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Shared WorkerData
If you want to retain references to WorkerData
between multiple threads, then you'd need to use Arc
. Since you additionally want to be able to mutate it, you'll need to use a Mutex
.
If you'll only be mutating within a single thread, then you could alternatively you a RwLock
, which compared to a Mutex
will allow you to lock and obtain multiple immutable references at the same time.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
If you add a method to be able to obtain data
in the form of Arc<RwLock<WorkerData>>
. Then if you clone the Arc
and lock it (the inner RwLock
) prior to calling stop()
, then that would result in a deadlock.
To avoid that, any data()
method should return &WorkerData
or &mut WorkerData
instead of the Arc
. That way you'd be unable to call stop()
and cause a deadlock.
Flag to stop worker
If you actually want to stop the worker thread, then you'd have to use a flag to signal it to do so. You can create a flag in the form of a shared AtomicBool
.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Multiple threads and multiple tasks
If you want multiple kinds of tasks processed, spread across multiple threads, then here's a more generalized example.
You already mentioned using mpsc
. So you can use a Sender
and Receiver
along with a custom Task
and TaskResult
enum.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
An example of using the Worker
along with sending tasks and receiving task results, would then look like this:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
In a few cases you might need to implement Send
for Task
and/or TaskResult
. Check out "Understanding the Send trait".
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Stop and restart a already running thread
Java threads are not restartable. For what you are trying to achieve, you could create a new thread each time, or you could look at an ExecutorService. Just create a single threaded executor (Executors.newSingleThreadExecutor), and submit your runnable to it every time you need it to run.
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(runnable);
Related Topics
What Is Python Whitespace and How Does It Work
How to Check the Versions of Python Modules
Postponing Functions in Python
Flask to Return Image Stored in Database
Limit Number of Threads in Numpy
Determine If Python Is Running Inside Virtualenv
Is There a Matplotlib Equivalent of Matlab's Datacursormode
Pyinstaller and --Onefile: How to Include an Image in the Exe File
Collision Between Masks in Pygame
Google Colab: How to Read Data from My Google Drive
Using Pickle.Dump - Typeerror: Must Be Str, Not Bytes
How to Get Rid of Double Backslash in Python Windows File Path String