How to Start and Stop a Thread

How to start and stop a thread

This is David Heffernan's idea fleshed-out. The example below runs for 1 second, then stops for 1 second, then runs for 1 second, and so on.

import time
import threading
import datetime as DT
import logging
logger = logging.getLogger(__name__)

def worker(cond):
i = 0
while True:
with cond:
cond.wait()
logger.info(i)
time.sleep(0.01)
i += 1

logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')

cond = threading.Condition()
t = threading.Thread(target=worker, args=(cond, ))
t.daemon = True
t.start()

start = DT.datetime.now()
while True:
now = DT.datetime.now()
if (now-start).total_seconds() > 60: break
if now.second % 2:
with cond:
cond.notify()

How to start/stop/restart a thread in Java?

Once a thread stops you cannot restart it. However, there is nothing stopping you from creating and starting a new thread.

Option 1: Create a new thread rather than trying to restart.

Option 2: Instead of letting the thread stop, have it wait and then when it receives notification you can allow it to do work again. This way the thread never stops and will never need to be restarted.

Edit based on comment:

To "kill" the thread you can do something like the following.

yourThread.setIsTerminating(true); // tell the thread to stop
yourThread.join(); // wait for the thread to stop

C# Threading - How to start and stop a thread

This is how I do it...

public class ThreadA {
public ThreadA(object[] args) {
...
}
public void Run() {
while (true) {
Thread.sleep(1000); // wait 1 second for something to happen.
doStuff();
if(conditionToExitReceived) // what im waiting for...
break;
}
//perform cleanup if there is any...
}
}

Then to run this in its own thread... ( I do it this way because I also want to send args to the thread)

private void FireThread(){
Thread thread = new Thread(new ThreadStart(this.startThread));
thread.start();
}
private void (startThread){
new ThreadA(args).Run();
}

The thread is created by calling "FireThread()"

The newly created thread will run until its condition to stop is met, then it dies...

You can signal the "main" with delegates, to tell it when the thread has died.. so you can then start the second one...

Best to read through : This MSDN Article

Java Start and Stop Threads

For anyone who might use TarsosDSP in the future and faces the same problem then the easiest way is to make a global dispatcher

dispatcher = AudioDispatcherFactory.fromDefaultMicrophone(22050,1024,0);

And simply stop the Thread with this:

dispatcher.stop();

Is there any way to kill a Thread?

It is generally a bad pattern to kill a thread abruptly, in Python, and in any language. Think of the following cases:

  • the thread is holding a critical resource that must be closed properly
  • the thread has created several other threads that must be killed as well.

The nice way of handling this, if you can afford it (if you are managing your own threads), is to have an exit_request flag that each thread checks on a regular interval to see if it is time for it to exit.

For example:

import threading

class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""

def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()

In this code, you should call stop() on the thread when you want it to exit, and wait for the thread to exit properly using join(). The thread should check the stop flag at regular intervals.

There are cases, however, when you really need to kill a thread. An example is when you are wrapping an external library that is busy for long calls, and you want to interrupt it.

The following code allows (with some restrictions) to raise an Exception in a Python thread:

def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
'''A thread class that supports raising an exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id

CAREFUL: this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")

# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id

# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid

# TODO: in python 2.6, there's a simpler way to do: self.ident

raise AssertionError("could not determine the thread's id")

def raiseExc(self, exctype):
"""Raises the given exception type in the context of this thread.

If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.

If you are sure that your exception should terminate the thread,
one way to ensure that it works is:

t = ThreadWithExc( ... )
...
t.raiseExc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raiseExc( SomeException )

If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.

CAREFUL: this function is executed in the context of the
caller thread, to raise an exception in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )

(Based on Killable Threads by Tomer Filiba. The quote about the return value of PyThreadState_SetAsyncExc appears to be from an old version of Python.)

As noted in the documentation, this is not a magic bullet because if the thread is busy outside the Python interpreter, it will not catch the interruption.

A good usage pattern of this code is to have the thread catch a specific exception and perform the cleanup. That way, you can interrupt a task and still have proper cleanup.

Start/stop thread

Java operates on a cooperative interrupt model for stopping threads. That means you can't simply stop a thread mid-execution without cooperation from the thread itself. If you want to stop a thread the client can call Thread.interrupt() method to request the thread stop:

public class SomeBackgroundProcess implements Runnable {

Thread backgroundThread;

public void start() {
if( backgroundThread == null ) {
backgroundThread = new Thread( this );
backgroundThread.start();
}
}

public void stop() {
if( backgroundThread != null ) {
backgroundThread.interrupt();
}
}

public void run() {
try {
Log.i("Thread starting.");
while( !backgroundThread.interrupted() ) {
doSomething();
}
Log.i("Thread stopping.");
} catch( InterruptedException ex ) {
// important you respond to the InterruptedException and stop processing
// when its thrown! Notice this is outside the while loop.
Log.i("Thread shutting down as it was requested to stop.");
} finally {
backgroundThread = null;
}
}

The important part of threading is that you don't swallow InterruptedException and instead stop your thread's loop and shutdown because you only get this exception if a client has request the thread interrupt itself.

So you simply need to hook up the SomeBackgroundProcess.start() to the event for unlock, and hook up the SomeBackgroundProcess.stop() to the lock event.

How to start and stop a worker thread

In short, the T in join() on a JoinHandle<T> returns the result of the closure passed to thread::spawn(). So in your case JoinHandle<?> would need to be JoinHandle<()> as your closure returns nothing, i.e. () (unit).

Other than that, your dummy code contains a few additional issues.

  • The return type of run() is incorrect, and would need to at least be Result<(), ()>.
  • The thread field would need to be Option<JoinHandle<()> to be able to handle fn stop(&mut self) as join() consumes the JoinHandle.
  • However, you're attempting to pass &mut self to the closure, which brings a lot more issues, boiling down to multiple mutable references
    • This could be solved with e.g. Mutex<A>. However, if you call stop() then that could lead to a deadlock instead.

However, since it was dummy code, and you clarified in the comments. Let me try and clarify what you meant with a few examples. This includes me rewriting your dummy code.

Result after worker is done

If you don't need access to the data while the worker thread is running, then you can make a new struct WorkerData. Then in run() you copy/clone the data you need from A (or as I've renamed it Worker). Then in the closure you finally return data again, so you can acquire it through join().

use std::thread::{self, JoinHandle};

struct WorkerData {
...
}

impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}

struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
pub fn new() -> Self {
Self { thread: None }
}

pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};

self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}

pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}

You don't really need thread to be Option<JoinHandle<WorkerData>> and instead could just use JoinHandle<WorkerData>>. Because if you wanted to call run() again, it would just be easier to reassign the variable holding the Worker.

So now we can simplify Worker, removing the Option and change stop to consume thread instead, along with creating new() -> Self in place of run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
thread: JoinHandle<WorkerData>,
}

impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};

let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});

Self { thread }
}

pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}

Shared WorkerData

If you want to retain references to WorkerData between multiple threads, then you'd need to use Arc. Since you additionally want to be able to mutate it, you'll need to use a Mutex.

If you'll only be mutating within a single thread, then you could alternatively you a RwLock, which compared to a Mutex will allow you to lock and obtain multiple immutable references at the same time.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}

impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));

let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}

i = 1 + i;
if i > 5 {
return;
}
}
}
});

Self { thread, data }
}

pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}

If you add a method to be able to obtain data in the form of Arc<RwLock<WorkerData>>. Then if you clone the Arc and lock it (the inner RwLock) prior to calling stop(), then that would result in a deadlock.
To avoid that, any data() method should return &WorkerData or &mut WorkerData instead of the Arc. That way you'd be unable to call stop() and cause a deadlock.

Flag to stop worker

If you actually want to stop the worker thread, then you'd have to use a flag to signal it to do so. You can create a flag in the form of a shared AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}

impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));

let stop_flag = Arc::new(AtomicBool::new(false));

let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}

if let Ok(mut data) = data.write() {
data.call();
}

// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});

Self {
thread,
data,
stop_flag,
}
}

pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}

Multiple threads and multiple tasks

If you want multiple kinds of tasks processed, spread across multiple threads, then here's a more generalized example.

You already mentioned using mpsc. So you can use a Sender and Receiver along with a custom Task and TaskResult enum.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
...
}

pub enum TaskResult {
...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}

impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();

let task_receiver = Arc::new(Mutex::new(task_receiver));

let stop_flag = Arc::new(AtomicBool::new(false));

Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();

thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}

let task_receiver = task_receiver.lock().unwrap();

if let Ok(task) = task_receiver.recv() {
drop(task_receiver);

// Perform the `task` here

// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}

pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);

self.stop_flag.store(true, Ordering::Relaxed);

self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}

#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}

#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}

An example of using the Worker along with sending tasks and receiving task results, would then look like this:

fn main() {
let mut worker = Worker::new(4);

// Request that a `Task` is performed
worker.request(task);

// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}

In a few cases you might need to implement Send for Task and/or TaskResult. Check out "Understanding the Send trait".

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}

Stop and restart a already running thread

Java threads are not restartable. For what you are trying to achieve, you could create a new thread each time, or you could look at an ExecutorService. Just create a single threaded executor (Executors.newSingleThreadExecutor), and submit your runnable to it every time you need it to run.

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(runnable);


Related Topics



Leave a reply



Submit