What Is the Best Solution to Pause and Resume Pthreads

What is the best solution to pause and resume pthreads?

Actually, this code isn't thread safe. The mutex isn't actually protecting anything, leaving the implied predicate vulnerable to race conditions.

Look at this code -- what is the mutex protecting? What protects the suspend/resume state?

void suspendMe()
{
pthread_mutex_lock(&m_SuspendMutex);
pthread_cond_wait(&m_ResumeCond, &m_SuspendMutex);
}
void resumeMe()
{
pthread_cond_signal(&m_ResumeCond);
pthread_mutex_unlock(&m_SuspendMutex);
}

This is correct:

void suspendMe()
{ // tell the thread to suspend
pthread_mutex_lock(&m_SuspendMutex);
m_SuspendFlag = 1;
pthread_mutex_unlock(&m_SuspendMutex);
}
void resumeMe()
{ // tell the thread to resume
pthread_mutex_lock(&m_SuspendMutex);
m_SuspendFlag = 0;
phtread_cond_broadcast(&m_ResumeCond);
pthread_mutex_unlock(&m_SuspendMutex);
}
void checkSuspend()
{ // if suspended, suspend until resumed
pthread_mutex_lock(&m_SuspendMutex);
while (m_SuspendFlag != 0) pthread_cond_wait(&m_ResumeCond, &m_SuspendMutex);
pthread_mutex_unlock(&m_SuspendMutex);
}

The thread should call checkSuspend at safe points where it can be suspended. Other threads can call suspendMe and resumeMe to suspend/resume the thread.

Notice that now the mutex protects the m_SuspendFlag variable, ensuring that the thread is told to suspend, told to resume, and checks whether it should suspend or stay suspended under protection, making the code thread-safe.

Would it not be better to use 2 separate mutexes here, or is this the correct way to suspend a pthread??

Using two mutexes would defeat the entire point of condition variables. The whole mechanism by which they work is that you can check whether there is something you should wait for and then atomically wait for it without either holding the lock while you wait or having to release the lock and then wait. If you hold the lock while you wait, how can any other thread change the state? And if you release the lock and then wait, what happens if you miss the change in state?

By the way, it almost never makes sense to pause or resume a thread. If you feel like you need to pause a thread from the outside, that just indicates that you coded the thread to do something you didn't actually want it to do. Questions about pausing or resuming threads often indicate an incorrect mental model of thread programming. A thread might need to wait for something, but it shouldn't be "paused" from the outside because it should already know by its own coding when it shouldn't do some particular bit of work.

How to suspend and resume a POSIX thread in C++?

After a little modification of above code , it seems working . Thanks guy for pointing out issues on above code, the changes are as follow.

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include<iostream>
#define on 1
#define off 0
void gpio_write(int fd, int value);
void* led_Flash(void* args);

class PThread {
public:

pthread_t threadID;
volatile int suspended;
int fd;
pthread_mutex_t lock;
PThread(int fd1)
{
this->fd=fd1;
this->suspended =1; //Initial state: suspend blinking untill resume call
pthread_mutex_init(&this->lock,NULL);
pthread_create(&this->threadID, NULL, led_Flash, (void*)this );

}
~PThread()
{
pthread_join(this->threadID , NULL);
pthread_mutex_destroy(&this->lock);
}

void suspendBlink() {
pthread_mutex_lock(&this->lock);
this->suspended = 1;
pthread_mutex_unlock(&this->lock);
}

void resumeBlink() {
pthread_mutex_lock(&this->lock);
this->suspended = 0;
pthread_mutex_unlock(&this->lock);
}
};

void gpio_write(int fd, int value)
{
if(value!=0)
printf("%d: on\n", fd);
else
printf("%d: off\n", fd);
}

void* led_Flash(void* args)
{
PThread* pt= (PThread*) args;
int fd= pt->fd;

while(1)
{
if(!(pt->suspended))
{
gpio_write(fd,on);
usleep(1);
gpio_write(fd,off);
usleep(1);
}
}

return NULL;
}

int main()
{
//Create threads with Initial state: suspend/stop blinking untill resume call
class PThread redLED(1);
class PThread amberLED(2);
class PThread greenLED(3);

// Start blinking
redLED.resumeBlink();
amberLED.resumeBlink();
greenLED.resumeBlink();
sleep(5);

// suspend/stop blinking
amberLED.suspendBlink();

sleep(5);

redLED.suspendBlink();

sleep(5);

amberLED.suspendBlink();

sleep(5);

redLED.resumeBlink();

pthread_exit(NULL);

return 0;
}

How to sleep or pause a PThread in c on Linux

You can use a mutex, condition variable, and a shared flag variable to do this. Let's assume these are defined globally:

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int play = 0;

You could structure your playback code like this:

for(;;) { /* Playback loop */
pthread_mutex_lock(&lock);
while(!play) { /* We're paused */
pthread_cond_wait(&cond, &lock); /* Wait for play signal */
}
pthread_mutex_unlock(&lock);
/* Continue playback */
}

Then, to play you can do this:

pthread_mutex_lock(&lock);
play = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&lock);

And to pause:

pthread_mutex_lock(&lock);
play = 0;
pthread_mutex_unlock(&lock);

How to pause and resume thread?

There is no one right way to pause and resume a thread.

First, there is no way at all to do it without the cooperation of the code that thread is running. Otherwise, disaster could occur if you pause a thread while it holds a lock that the thread that would resume it needs to acquire before it can resume. So you must have the cooperation of the code the thread you want to pause is running.

With the thread's cooperation, you can do it however you like. You can have an atomic bool that the thread periodically checks. You can just not give the thread work to do if it's designed to pause when it has no work to do.

There's no one right way and it entirely depends on other design decisions. Primarily, it depends on what that code is doing and why you want to pause it.

One other thing that is extremely important: Any time you feel you need to reach into a thread from outside and make it do or not do something, that should be a sign to you that you coded the thread wrong in the first place. A thread should know what work it needs to do and when it needs to not do work by its own design. If something else has to "reach in" intrusively and make it do or not do things, you should re-examine the design decisions that got you to that point.

And to your specific point:

I want to use pthread_kill() function with SIGSTOP and SIGCONT signals to thread.

That couldn't possibly work. What if the thread happens to hold an internal library lock that needs to be acquired to return from pthread_kill? The thread trying to pause it would also pause itself. In any event, SIGSTOP is defined as stopping a process, not a thread.

Pthreads signal-like pause?

In pthread / POSIX Thread stack, you may use pthread_kill to send signals to individual threads; but unfortunately that doesnt include "stop", "continue", or "terminate". See the Notes here

If you are only interested in pausing the thread; a crafty formula of mutexes and conditional variables are the best way forward.

How to properly suspend threads?

I believe gmch's answer should solve the original question. However, not all pthread implementations include pthread_barrier_t and the related functions (as they are an optional part of the POSIX threads specs), so here is the custom barrier implementation I mentioned in a comment to the original question.

(Note that there are other ways to suspend/resume threads asynchronously, during normal operation, and without co-operation from the threads themselves. One way to implement that is to use one or two realtime signals, and a signal handler that blocks in sigsuspend(), waiting for the complementary "continue" signal. The controlling thread will have to use pthread_kill() or pthread_sigqueue() to send the pausing and continuing signals to each thread involved. The threads are minimally affected; aside from possible EINTR errors from blocking syscalls (as signal delivery interrupts blocking syscalls), the threads just don't do any progress -- just as if they weren't scheduled for a while. Because of that, there should not be any issues with respect to the threads getting paused and continued at slightly different times. If you are interested in this method, leave a comment, and I could try and show an example implementation of that, too.)

Perhaps this will be of use to someone else needing a pause-able custom barrier implementation, or perhaps as a basis of their own custom barrier.

Edited to add DRAINING mode, when threads are expected to quit. In your worker loop, use do { ... } while (!barrier_wait(&barrier));

barrier.h:

#ifndef   BARRIER_H
#define BARRIER_H
#include <pthread.h>
#include <errno.h>

typedef enum {
INVALID = -1,
RUNNING = 0,
PAUSED = 1,
DRAINING = 2
} barrier_state;

typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
barrier_state state;
int threads; /* Number of participants */
int waiting; /* Number of participants waiting */
} barrier;

/** barrier_drain() - Mark barrier so that threads will know to exit
* @b: pointer to barrier
* @ids: pthread_t's for the threads to wait on, or NULL
* @retvals: return values from the threads, or NULL
* This function marks the barrier such that all threads arriving
* at it will return ETIMEDOUT.
* If @ids is specified, the threads will be joined.
* Returns 0 if successful, errno error code otherwise.
*/
static int barrier_drain(barrier *const b, pthread_t *const ids, void **const retvals)
{
int result, threads;
void *retval;

if (!b || b->threads < 0)
return errno = EINVAL;

result = pthread_mutex_lock(&b->mutex);
if (result)
return errno = result;

b->state = DRAINING;
pthread_cond_broadcast(&b->cond);

threads = b->threads;
b->threads = 0;

pthread_mutex_unlock(&b->mutex);

while (threads-->0) {
result = pthread_join(ids[threads], &retval);
if (result)
return errno = result;
if (retvals)
retvals[threads] = retval;
}

return errno = 0;
}

/** barrier_pause() - Mark barrier to pause threads in the barrier
* @b: pointer to barrier
* This function marks the barrier such that all threads arriving
* in it will wait in the barrier, until barrier_continue() is
* called on it. If barrier_continue() is called before all threads
* have arrived on the barrier, the barrier will operate normally;
* i.e. the threads will continue only when all threads have arrived
* at the barrier.
* Returns 0 if successful, errno error code otherwise.
*/
static int barrier_pause(barrier *const b)
{
int result;

if (!b || b->threads < 1)
return errno = EINVAL;

result = pthread_mutex_lock(&b->mutex);
if (result)
return errno = result;

if (b->state != PAUSED && b->state != RUNNING) {
pthread_mutex_unlock(&b->mutex);
return errno = EPERM;
}

b->state = PAUSED;
pthread_mutex_unlock(&b->mutex);
return errno = 0;
}

/** barrier_continue() - Unpause barrier
* @b: Pointer to barrier
* This function lets the barrier operate normally.
* If all threads are already waiting in the barrier,
* it lets them proceed immediately. Otherwise, the
* threads will continue when all threads have arrived
* at the barrier.
* Returns 0 if success, errno error code otherwise.
*/
static int barrier_continue(barrier *const b)
{
int result;

if (!b || b->threads < 0)
return errno = EINVAL;

result = pthread_mutex_lock(&b->mutex);
if (result)
return errno = result;

if (b->state != PAUSED) {
pthread_mutex_unlock(&b->mutex);
return errno = EPERM;
}

b->state = RUNNING;

if (b->waiting >= b->threads)
pthread_cond_broadcast(&b->cond);

pthread_mutex_unlock(&b->mutex);

return errno = 0;
}

/** barrier_wait() - Wait on the barrier
* @b: Pointer to barrier
* Each thread participating in the barrier
* must call this function.
* Callers will block (wait) in this function,
* until all threads have arrived.
* If the barrier is paused, the threads will
* wait until barrier_continue() is called on
* the barrier, otherwise they will continue
* when the final thread arrives to the barrier.
* Returns 0 if success, errno error code otherwise.
* Returns ETIMEDOUT if the thread should exit.
*/
static int barrier_wait(barrier *const b)
{
int result;

if (!b || b->threads < 0)
return errno = EINVAL;

result = pthread_mutex_lock(&b->mutex);
if (result)
return errno =result;

if (b->state == INVALID) {
pthread_mutex_unlock(&b->mutex);
return errno = EPERM;
} else
if (b->state == DRAINING) {
pthread_mutex_unlock(&b->mutex);
return errno = ETIMEDOUT;
}

b->waiting++;

if (b->state == RUNNING && b->waiting >= b->threads)
pthread_cond_broadcast(&b->cond);
else
pthread_cond_wait(&b->cond, &b->mutex);

b->waiting--;
pthread_mutex_unlock(&b->mutex);

return errno = 0;
}

/** barrier_destroy() - Destroy a previously initialized barrier
* @b: Pointer to barrier
* Returns zero if success, errno error code otherwise.
*/
static int barrier_destroy(barrier *const b)
{
int result;

if (!b || b->threads < 0)
return errno = EINVAL;

b->state = INVALID;
b->threads = -1;
b->waiting = -1;

result = pthread_cond_destroy(&b->cond);
if (result)
return errno = result;

result = pthread_mutex_destroy(&b->mutex);
if (result)
return errno = result;

return errno = 0;
}

/** barrier_init() - Initialize a barrier
* @b: Pointer to barrier
* @threads: Number of threads to participate in barrier
* Returns 0 if success, errno error code otherwise.
*/
static int barrier_init(barrier *const b, const int threads)
{
int result;

if (!b || threads < 1)
return errno = EINVAL;

result = pthread_mutex_init(&b->mutex, NULL);
if (result)
return errno = result;

result = pthread_cond_init(&b->cond, NULL);
if (result)
return errno = result;

b->state = RUNNING;
b->threads = threads;
b->waiting = 0;

return errno = 0;
}

#endif /* BARRIER_H */

The logic is quite simple. All threads waiting in the barrier wait on the cond condition variable. If the barrier operates normally (state==RUNNING), the final thread arriving at the barrier will broadcast on the condition variable instead of waiting on it, thus waking up all other threads.

If the barrier is paused (state==PAUSED), even the final thread arriving at the barrier will wait on the condition variable.

When barrier_pause() is called, the barrier state is changed to paused. There may be zero or more threads waiting on the condition variable, and that is okay: only the final thread arriving at the barrier has a special role, and that thread cannot have yet arrived. (If it had, it'd have emptied the barrier already.)

When barrier_continue() is called, the barrier state is changed to normal (state==RUNNING). If all threads are waiting on the condition variable, they are released by broadcasting on the condition variable. Otherwise, the final thread arriving at the barrier will broadcast on the condition variable and release the waiting threads normally.

Note that barrier_pause() and barrier_continue() do not wait for the barrier to become full or to drain. It only blocks on the mutex, and the functions only hold it for very short periods at a time. (In other words, they may block for a short time, but will not wait for the barrier to reach any specific situation.)

If the barrier is draining (state==DRAINING), threads arriving at the barrier return immediately with errno==ETIMEDOUT. For simplicity, all the barrier functions now unconditionally set errno (to 0 if success, errno code if error, ETIMEDOUT if draining).

The mutex protects the barrier fields so that only one thread may access the fields at once. In particular, only one thread can arrive at the barrier at the same time, due to the mutex.

One complicated situation exists: the loop body the barrier is used in might be so short, or there might be so many threads, that threads start arriving at the next iteration of the barrier even before all threads from the previous iteration have left it.

According to POSIX.1-2004, pthread_cond_broadcast() "shall unblock all threads currently blocking on the specified condition variable". Even though their wakeups will be sequential -- as each one will acquire the mutex in turn --, only those threads that were blocked on it when pthread_cond_broadcast() was called will be woken up.

So, if the implementation follows POSIX semantics with respect to condition variables, woken threads can (even immediately!) re-wait on the condition variable, waiting for the next broadcast or signal: the "old" and "new" waiters are separate sets. This use case is actually quite typical, and all POSIX implementations I've heard of do allow that -- they do not wake up threads that started waiting on the condition variable after the last pthread_cond_broadcast().

If we can rely on POSIX condition variable wakeup semantics, it means the above barrier implementation should work reliably, including in the case where threads arrive at the barrier (for the next iteration), even before all threads (from the previous iteration) have left the barrier.

(Note that the known "spurious wakeups" issue only affects pthread_cond_signal(); i.e. when calling pthread_cond_signal() more than one thread may be woken up. Here, we wake up all threads using pthread_cond_broadcast(). We rely on it only waking current waiters, and not any future waiters.)


Here is a POSIX.1-2001 implementation for suspending and resuming threads asynchronously, without any co-operation from the target thread(s).

This uses two signals, one for suspending a thread, and another for resuming it. For maximum compatibility, I did not use GNU C extensions or POSIX.1b realtime signals. Both signals save and restore errno, so that the impact to the suspended threads would be minimal.

Note, however, that the functions listed in man 7 signal, "Interruption of system calls and library functions by signal handlers" section, after the "The following interfaces are never restarted after being interrupted by a signal handler" paragraph, will return errno==EINTR when suspended/resumed. This means you will have to use the traditional do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); loop, instead of just result = FUNCTION(...);.

The suspend_threads() and resume_threads() calls are not synchronous. The threads will be suspended/resumed either before, or sometime after, the function calls return. Also, suspend and resume signals sent from outside the process itself may affect the threads; it depends on if the kernel uses one of the target threads to deliver such signals. (This approach cannot ignore signals sent by other processes.)

Testing indicates that in practice, this suspend/resume functionality is quite reliable, assuming no outside interference (by sending signals caught by the target threads from another process). However, it is not very robust, and there are very few guarantees on its operation, but it might suffice for some implementations.

suspend-resume.h:

#ifndef   SUSPEND_RESUME_H
#define SUSPEND_RESUME_H

#if !defined(_POSIX_C_SOURCE) && !defined(POSIX_SOURCE)
#error This requires POSIX support (define _POSIX_C_SOURCE).
#endif

#include <signal.h>
#include <errno.h>
#include <pthread.h>

#define SUSPEND_SIGNAL SIGUSR1
#define RESUME_SIGNAL SIGUSR2

/* Resume signal handler.
*/
static void resume_handler(int signum, siginfo_t *info, void *context)
{
/* The delivery of the resume signal is the key point.
* The actual signal handler does nothing. */
return;
}

/* Suspend signal handler.
*/
static void suspend_handler(int signum, siginfo_t *info, void *context)
{
sigset_t resumeset;
int saved_errno;

if (!info || info->si_signo != SUSPEND_SIGNAL)
return;

/* Save errno to keep it unchanged in the interrupted thread. */
saved_errno = errno;

/* Block until suspend or resume signal received. */
sigfillset(&resumeset);
sigdelset(&resumeset, SUSPEND_SIGNAL);
sigdelset(&resumeset, RESUME_SIGNAL);
sigsuspend(&resumeset);

/* Restore errno. */
errno = saved_errno;
}

/* Install signal handlers.
*/
static int init_suspend_resume(void)
{
struct sigaction act;

sigemptyset(&act.sa_mask);
sigaddset(&act.sa_mask, SUSPEND_SIGNAL);
sigaddset(&act.sa_mask, RESUME_SIGNAL);
act.sa_flags = SA_RESTART | SA_SIGINFO;

act.sa_sigaction = resume_handler;
if (sigaction(RESUME_SIGNAL, &act, NULL))
return errno;

act.sa_sigaction = suspend_handler;
if (sigaction(SUSPEND_SIGNAL, &act, NULL))
return errno;

return 0;
}

/* Suspend one or more threads.
*/
static int suspend_threads(const pthread_t *const identifier, const int count)
{
int i, result, retval = 0;

if (!identifier || count < 1)
return errno = EINVAL;

for (i = 0; i < count; i++) {
result = pthread_kill(identifier[i], SUSPEND_SIGNAL);
if (result && !retval)
retval = result;
}

return errno = retval;
}

/* Resume one or more threads.
*/
static int resume_threads(const pthread_t *const identifier, const int count)
{
int i, result, retval = 0;

if (!identifier || count < 1)
return errno = EINVAL;

for (i = 0; i < count; i++) {
result = pthread_kill(identifier[i], RESUME_SIGNAL);
if (result && !retval)
retval = result;
}

return errno = retval;
}

#endif /* SUSPEND_RESUME_H */

Questions?

pthread - Pausing/Unpausing all threads

You may use dedicated thread for wait signals using sigwait. When signal is received, waiting is returned, and given thread may inform other threads within normal code (not a signal handler).

Assuming you have functions for pause and unpause threads like these

int paused;
pthread_mutex m;
pthread_cond cond;

void pause_threads(void)
{
pthread_mutex_lock(&m);
paused = 1;
pthread_mutex_unlock(&m);
}

void unpause_threads(void)
{
pthread_mutex_lock(&m);
paused = 0;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&m);
}

dedicated thread can be implemented in this way:

// Block signals for wait
sigset_t usr_set;

sigemptyset(&usr_set);
sigaddset(&usr_set, SIGUSR1);
sigaddset(&usr_set, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &usr_set, NULL);

// If other threads will be created from given one, they will share signal handling.
// Otherwise actions above should be repeated for new threads.

int sig;
// Repeatedly wait for signals arriving.
while(!sigwait(&usr_set, &sig)) {
if(sig == SIGUSR1) {
pause_threads();
}
else {
unpause_threads();
}
}


Related Topics



Leave a reply



Submit