How to Close a Thread When Multithreading

How to close a thread when multithreading?

To terminate an Thread controlled, using a threadsafe threading.Event():

import threading, time

def Thread_Function(running):
while running.is_set():
print('running')
time.sleep(1)

if __name__ == '__main__':
running = threading.Event()
running.set()

thread = threading.Thread(target=Thread_Function, args=(running,))
thread.start()

time.sleep(1)
print('Event running.clear()')
running.clear()

print('Wait until Thread is terminating')
thread.join()
print("EXIT __main__")

Output:

running  
running
Event running.clear()
Wait until Thread is terminating
EXIT __main__

Tested with Python:3.4.2


Online Demo: reply.it

How to close a thread from within?

When you start a thread, it begins executing a function you give it (if you're extending threading.Thread, the function will be run()). To end the thread, just return from that function.

According to this, you can also call thread.exit(), which will throw an exception that will end the thread silently.

Is there any way to kill a Thread?

It is generally a bad pattern to kill a thread abruptly, in Python, and in any language. Think of the following cases:

  • the thread is holding a critical resource that must be closed properly
  • the thread has created several other threads that must be killed as well.

The nice way of handling this, if you can afford it (if you are managing your own threads), is to have an exit_request flag that each thread checks on a regular interval to see if it is time for it to exit.

For example:

import threading

class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""

def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()

In this code, you should call stop() on the thread when you want it to exit, and wait for the thread to exit properly using join(). The thread should check the stop flag at regular intervals.

There are cases, however, when you really need to kill a thread. An example is when you are wrapping an external library that is busy for long calls, and you want to interrupt it.

The following code allows (with some restrictions) to raise an Exception in a Python thread:

def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
'''A thread class that supports raising an exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id

CAREFUL: this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")

# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id

# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid

# TODO: in python 2.6, there's a simpler way to do: self.ident

raise AssertionError("could not determine the thread's id")

def raiseExc(self, exctype):
"""Raises the given exception type in the context of this thread.

If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.

If you are sure that your exception should terminate the thread,
one way to ensure that it works is:

t = ThreadWithExc( ... )
...
t.raiseExc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raiseExc( SomeException )

If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.

CAREFUL: this function is executed in the context of the
caller thread, to raise an exception in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )

(Based on Killable Threads by Tomer Filiba. The quote about the return value of PyThreadState_SetAsyncExc appears to be from an old version of Python.)

As noted in the documentation, this is not a magic bullet because if the thread is busy outside the Python interpreter, it will not catch the interruption.

A good usage pattern of this code is to have the thread catch a specific exception and perform the cleanup. That way, you can interrupt a task and still have proper cleanup.

How properly close thread in Python

First, avoid using the low-level API unless you absolutely have to. The threading module is preferred over _thread. In general in Python, avoid anything starting with an underscore.

Now, the method you are looking for is called join. I.e.

import time
from threading import Thread

stop = False

def print_loop():
num = 0
while not stop:
num = num + 1
print(num)
time.sleep(1)

thread = Thread(target=print_loop)
thread.start()

time.sleep(10)

stop = True
thread.join()

How to close Threads in Python?

I do not see why you need a Queue in the first place.

After all in your design every thread just processes one task.

You should be able to pass that task to the thread on creation.

This way you do not need a Queue and you get rid of the while-loop:

class MX_getAAAA_thread(threading.Thread):
def __init__(self, id_domain, mx):
threading.Thread.__init__(self)
self.id_domain = id_domain
self.mx = mx

Then you can rid of the while-loop inside the run-method:

def run(self):
res = dns.resolver.Resolver()
res.lifetime = 1.5
res.timeout = 0.5

try:
answers = res.query(self.mx,'AAAA')
ip_mx = str(answers[0])
except:
ip_mx = "N/A"

with lock:
sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()

print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

Create one thread for each task

for mx in answers:
t = MX_getAAAA_thread(qMX, id_domain, mx)
t.setDaemon(True)
threads.append(t)
t.start()

and join them

for thread in threads:
thread.join()

Python, Stop a Thread

As I was saying in my comment, the easiest way is to use threading.Event to signal your thread when it should exit. That way you can expose the event and let other threads set it while you can check for its state from within your thread and exit on request.

In your case, it could be as simple as:

class Pinger(threading.Thread):

def __init__(self, address='', rate=1):
threading.Thread.__init__(self)
self.kill = threading.Event()
# the rest of your setup...

# etc.

def start_ping(self):
self.timestamp = datetime.datetime.now()
while not self.kill.is_set():
# do your pinging stuff

# etc.

Then whenever you want the thread stopped (like from your UI), just call on it: pinger_instance.kill.set() and you're done. Keep in mind, tho, that it will take some time for it to get killed due to the blocking os.system() call and due to the time.sleep() you have at the end of your Pinger.start_ping() method.

How to terminate threads cleanly in C?

You must not call pthread_exit() in the cleanup functions, because pthread_exit() will also call the cleanup function registered for the thread.

So, in your program, the cleanup function is called recursively and the threads never exit.

About the kill from another terminal, the command kill -9 and the pid of the process should always work because SIGKILL can't be ignored nor caught.

And in the signal handler function, you have to use async-signal-safe functions, printf() isn't async-signal-safe.

Another way to wait for a signal in the main thread is to use sigwait() or sigwaitinfo() instead of pause(), like you did for SIGALARM in a thread. So it won't need to register a handler function, but it will need to block the signals to be caught in all threads.

EDIT: To answer your last comment.

Exiting the threads task2() and task3() with a flag seems to be complex, because the main thread have to send SIGALRM to task2 in order to wake it up, and also signal the condition in order to wake up task3.

I modified your code to try to use a flag, but i may have missed an eventual problem because synchronizing threads may be complex.

In the case of your program, I haven't enough knwoledge to say if it is better to use pthread_cancel() and pthread_testcancel(), or to use flags. However, pthread_cancel() seems to be able to cancel without synchronization problems, threads that are waiting for signals or for a condition.

Using a flag, for task3, there could be the following problem:

  1. task3 check the flag that is 0
  2. main thread set the flag to 1
  3. main thread signal the condition
  4. task3 begin to wait for the condition

In this case, thread task3 won't exit, because it wasn't waiting when the condition was signaled. I'am not sure, but this problem is maybe avoided by protecting the flag with the same mutex we use for the condition. Because when the flag will be set and the condition signaled, task3 will be waiting for the condition or doing work out of the critical section.

I don't know if there may be a problem for task2, for example if the signal is lost due to an internal problem, but normally, the signal will be pending.

Here is the code of my test. I placed 1 as argument for the function pthread_cleanup_pop(), to make the threads execute the cleanup functions.

#include<stdlib.h>
#include<stdio.h>
#include<signal.h>
#include<stdint.h>
#include<pthread.h>

#define FALSE 0
volatile sig_atomic_t g_new_pic_flag=FALSE;
pthread_cond_t g_new_pic_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_new_pic_m = PTHREAD_MUTEX_INITIALIZER;
volatile int g_shutdown_task_3 = 0;

volatile int g_shutdown_task_1_2 = 0;
pthread_mutex_t g_shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
/* FUNCTION DECLARATION */

/*We define thread exit functions so that each pin
is lowered by the thread in which it is used avoiding
race condition between the signal handler of the main thread
and the other threads*/
void exitingThreadTask1(void* arg);
void exitingThreadTask2(void* arg);
void exitingThreadTask3(void* arg);

void* task1(void *arg); //thread function for the motion sensor
void* task2(void *arg); //thread function for the temperature reading
void* task3(void *arg); //thread function to post data on IOT platforms

/*Signal handler to return from pause*/
void sig_handler(int signo);

void err_exit(char err, char *msg) {
printf("\nError: %s\n",msg);
exit(1);
}

int main()
{
int err;
sigset_t omask, mask;
pthread_t thread_motionSensor;
pthread_t thread_tempReading;
pthread_t thread_platformPost;

printf("Created threads IDs\n");
/*
if (wiringPiSetup()<0)
{
printf("WiringPi error\n");
return -1;
}
*/
printf("WiringPi is ok\n");

if (signal(SIGQUIT, sig_handler)==SIG_ERR)
printf("Error on recording SIGQUITHANDLER\n");
if (signal(SIGINT, sig_handler)==SIG_ERR)
printf("Error on recording SIGQUITHANDLER\n");
if (signal(SIGTERM, sig_handler)==SIG_ERR)
printf("Error on recording SIGQUITHANDLER\n");

/*Create a new mask to block all signals for the following thread*/
sigfillset(&mask);
pthread_sigmask(SIG_SETMASK, &mask, &omask);
printf("Trying to create threads\n");
if ((err = pthread_create (&thread_motionSensor, NULL, task1, NULL))!=0)
{
printf("Thread 1 not created: error %d\n", err);
err_exit((const char)err, "pthread_create error");
}
printf("Thread 1 created. Trying to create Thread 2\n");
if((err = pthread_create (&thread_tempReading, NULL, task2, NULL))!=0)
{
printf("Thread 2 not created: error %d\n", err);
err_exit((const char)err, "pthread_create error");
}
printf("Thread 2 created. Trying to create Thread 3\n");
if ((err = pthread_create (&thread_platformPost, NULL, task3, NULL))!=0)
{
printf("Thread 3 not created: error %d %d\n", err);
err_exit((const char)err, "pthread_create error");
}
printf("Thread 3 created\n");
/*The main thread must block the SIGALRM but catch SIGINT
SIGQUIT, SIGTERM, SIgkILL*/
sigemptyset(&omask);
sigaddset(&omask, SIGINT);
sigaddset(&omask, SIGQUIT);
sigaddset(&omask, SIGKILL);
sigaddset(&omask, SIGTERM);

pthread_sigmask(SIG_UNBLOCK, &omask, NULL);
printf("Main thread waiting for signal\n");
pause();
printf("Exit signal received: cancelling threads\n");


pthread_mutex_lock(&g_shutdown_mutex);
g_shutdown_task_1_2 = 1;
pthread_mutex_unlock(&g_shutdown_mutex);
pthread_mutex_lock(&g_new_pic_m);
g_shutdown_task_3 = 1;
pthread_cond_signal(&g_new_pic_cond);
pthread_mutex_unlock(&g_new_pic_m);

pthread_kill(thread_tempReading,SIGALRM);


pthread_join(thread_motionSensor, NULL);
pthread_join(thread_tempReading, NULL);
pthread_join(thread_platformPost, NULL);
printf("Exiting from main thread and process\n");
exit(0);
}

void* task1(void *arg)
{
//INITIALIZING
pthread_cleanup_push(exitingThreadTask1, NULL);
while(1)
{
pthread_mutex_lock(&g_shutdown_mutex);
if(g_shutdown_task_1_2) {
pthread_mutex_unlock(&g_shutdown_mutex);
break;
}
pthread_mutex_unlock(&g_shutdown_mutex);
//do stuff1
sleep(1);
}
pthread_cleanup_pop(1);
pthread_exit(0);

}

void* task2(void *arg)
{
static const unsigned char schedule_time = 5;
int signo, err;
/*
We set a local mask with SIGALARM for the function sigwait
All signals have already been blocked
*/
sigset_t alarm_mask;
sigemptyset(&alarm_mask);
sigaddset(&alarm_mask, SIGALRM);
alarm(schedule_time);
pthread_cleanup_push(exitingThreadTask2, NULL);
while (1)
{
pthread_mutex_lock(&g_shutdown_mutex);
if(g_shutdown_task_1_2) {
pthread_mutex_unlock(&g_shutdown_mutex);
break;
}
pthread_mutex_unlock(&g_shutdown_mutex);

err = sigwait(&alarm_mask, &signo); //signo == SIGALRM check
if (err!=0)
err_exit(err, "sigwait failed\n");

pthread_mutex_lock(&g_shutdown_mutex);
if(g_shutdown_task_1_2) {
pthread_mutex_unlock(&g_shutdown_mutex);
break;
}
pthread_mutex_unlock(&g_shutdown_mutex);

//do stuff
alarm(schedule_time);
}
pthread_cleanup_pop(1);
pthread_exit(0);
}

void* task3(void *arg)
{
pthread_cleanup_push(exitingThreadTask3, NULL);
while(1)
{
pthread_mutex_lock(&g_new_pic_m);
if(g_shutdown_task_3) {
pthread_mutex_unlock(&g_new_pic_m);
break;
}
while(g_new_pic_flag==FALSE)
{
if(g_shutdown_task_3) break;

pthread_cond_wait(&g_new_pic_cond, &g_new_pic_m);

if(g_shutdown_task_3) break;
}
if(g_shutdown_task_3) {
pthread_mutex_unlock(&g_new_pic_m);
break;
}
pthread_mutex_unlock(&g_new_pic_m);
//do stuff
}
pthread_cleanup_pop(1);
pthread_exit(0);

}

void exitingThreadTask1(void* arg)
{
printf("Thread of task 1 exiting\n");
//digitalWrite(OUTPIN, LOW);
//digitalWrite(INPIN, LOW);
printf("Pins lowered\n");
}

void exitingThreadTask2(void* arg)
{
printf("Thread of task 2 exiting\n");
//digitalWrite(DHTPIN, LOW);
printf("Pin lowered\n");
}

void exitingThreadTask3(void* arg)
{
printf("Thread of task 3 exiting\n");
}

void sig_handler(int signo)
{
return;
}


Related Topics



Leave a reply



Submit