Race-Condition in Pthread_Once()

race-condition in pthread_once()?

Indeed, there is a race condition between the destructor of the local promise object (at the end of the constructor and the call to set_value() from the thread. That is, set_value() wakes the main tread, that just next destroys the promise object, but the set_value() function has not yet finished, and dead-locks.

Reading the C++11 standard, I'm not sure if your use is allowed:

void promise<void>::set_value();

Effects: atomically stores the value r in the shared state and makes that state ready.

But somewhere else:

The set_value, set_exception, set_value_at_thread_exit, and set_exception_at_thread_exit member functions behave as though they acquire a single mutex associated with the promise object while updating the promise object.

Are set_value() calls supposed to be atomic with regards to other functions, such as the destructor?

IMHO, I'd say no. The effects would be comparable to destroying a mutex while other thread is still locking it. The result is undefined.

The solution would be to make p outlive the thread. Two solutions that I can think of:

  1. Make p a member of the class, just as Michael Burr suggested in the other answer.

  2. Move the promise into the thread.

In the constructor:

std::promise<void> p;
std::future <void> f = p.get_future();
_thread = std::thread(&foo::run, this, std::move(p));

BTW, you don't need the call to bind, (the thread constructor is already overloaded), or call to std::move to move the thread (the right value is already an r-value). The call to std::move into the promise is mandatory, though.

And the thread function does not receive a reference, but the moved promise:

void run(std::promise<void> p)
{
p.set_value();
}

I think that this is precisely why C++11 defines two different classes: promise and future: you move the promise into the thread, but you keep the future to recover the result.

Multi Threaded and templated singleton race condition

The code you show is designed to protect/serialize the initailization of

static T instanceObject;

as defined in Singleton<T>::onceFunction(). In order to achieve that you use pthread_once. However, you have declared the pthread_once_t flag once_control at function/block scope with static storage...

static pthread_once_t  once_control = PTHREAD_ONCE_INIT;

Hence, I think the initialization of once_control suffers from the same race condition that your code is designed to solve for instanceObject.

Make once_control a static member of Singleton<T> instead.

Note: Your code also assumes the assignment...

Singleton<T>::instance = &instanceObject;

is atomic.

Solve race condition during semaphore initialization

If the real use of the semaphore is really as a mutex, use just that pthread_mutex_t. These can be initialized statically, so your problem would disappear.

The syntax would be

pthread_mutex_t global_mutex = PTHREAD_MUTEX_INITIALIZER;

If you really need a dynamic initialization of a global object, have a look into pthread_once. This is the type (pthread_once_t) and function that is foreseen by POSIX for such a task.

Non-obvious lifetime issue with std::promise and std::future

std::promise is just like any other object: you can only access it from one thread at a time. In this case, you are calling set_value() and destroying the object from separate threads without sufficient synchronization: nowhere in the spec does it say that set_value will not touch the promise object after making the future ready.

However, since this future is used for a one-shot synchronization, you don't need to do that anyway: create the promise/future pair right in run(), and pass the promise to the thread:

struct synchronous_job
{
synchronous_job(std::function<void()> job, dispatcher& d)
: _job(job)
, _d(d)
{
}
void run(){
std::promise<void> p;
std::future<void> f=p.get_future();

_d.post(
[&]{
cb(std::move(p));
});

f.wait();
}
private:
void cb(std::promise<void> p)
{
_job();
p.set_value();
}
std::function<void()> _job;
dispatcher& _d;
};

Pthread data race that drives me crazy

You're not initializing your mutexes and condition variables. The result is UB when used in pthread APIs. Two ways to do this, the simplest is just use the proper initializer:

pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t filled = PTHREAD_COND_INITIALIZER;

Unrelated, but worth mentioning, the last_done ideology is not necessary. This can be done with just the buf_empty and prod_done states. Specifically:

void *eval_positioning(void *tid)
{
int qpositions[N]; // on stack (thread-private)

while (1)
{
pthread_mutex_lock(&buffer_mutex);

// while still producing *and* the buffer is empty
while (!global.prod_done && global.buf_empty)
pthread_cond_wait(&filled, &buffer_mutex);

// if both are true, we're done. nothing to process, and
// there never will be (e.g. prod_done)
if (global.prod_done && global.buf_empty)
{
// signal anyone else waiting on that mutex+cvar
pthread_cond_signal(&filled);
break;
}

// if we have a buffer to process (even if prod_done is true)
else if (!global.buf_empty)
{
// make local copy of buffer
memcpy(qpositions, global.positions, sizeof qpositions);
++consumptions;

// mark global buffer as ready-to-receive
global.buf_empty = 1;
pthread_cond_signal(&empty);
pthread_mutex_unlock(&buffer_mutex);

// if validated...
if (valid_rows(qpositions) && valid_columns(qpositions) && valid_diagonals(qpositions))
{
// record and bump the printout counter.
pthread_mutex_lock(&print_mutex);
int row = printouts.top++;
pthread_mutex_unlock(&print_mutex);

// this need not be protected by the mutex. we "own"
// this now, and can just blast away.
memcpy(printouts.qpositions[row], qpositions, sizeof qpositions);
}
}
else
{
pthread_mutex_unlock(&buffer_mutex);
}
}

// make sure we unlock this
pthread_mutex_unlock(&buffer_mutex);

return tid;
}

With proper initialization of the concurrency materials, and the above eval processor, this is the consistent output:

Output

1 done
2 done
Solution 1: [ 2 8 9 15 ]
Placement:
[ 13 14 Q 16 ]
[ Q 10 11 12 ]
[ 5 6 7 Q ]
[ 1 Q 3 4 ]

Solution 2: [ 3 5 12 14 ]
Placement:
[ 13 Q 15 16 ]
[ 9 10 11 Q ]
[ Q 6 7 8 ]
[ 1 2 Q 4 ]

board: 4x4, workers: 2 (+1), exec time: 0.013001s, solutions: 2
productions: 1820
consumptions: 1820

Apologies for the puny laptop performance numbers

POSIX Threads, unique execution

pthread_cond_wait() and pthread_cond_broadcast()

In conjunction with pthread_mutex_lock()
pthread_mutex_unlock(), the
pthread_cond_wait() and
pthread_cond_broadcast() functions are the key to achieving the required functionality.
However, considerable care is required to get the predicate right.

/*
** Objective: N threads cooperate on M cycles or iterations of a task.
** A new random number is needed for each cycle, but all threads must
** use the same random number on each cycle.
** Any thread may evaluate the new random number.
*/

#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "stderr.h"

#ifndef NUM_THREADS
#define NUM_THREADS 3
#endif
#ifndef NUM_CYCLES
#define NUM_CYCLES 5
#endif

enum { MAX_THREADS = NUM_THREADS };
enum { MAX_CYCLES = NUM_CYCLES };

static pthread_mutex_t mtx_waiting = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cnd_waiting = PTHREAD_COND_INITIALIZER;
static int num_waiting = 0;
static int cycle = -1;

static float gl_rand = 0;
static long gl_long = 0;

static float next_iteration_random_number(int tid, int iteration)
{
pthread_mutex_lock(&mtx_waiting);
assert(cycle == iteration || cycle == iteration - 1);
num_waiting++;
printf("-->> TID %d, I = %d (C = %d, W = %d)\n",
tid, iteration, cycle, num_waiting);
while (cycle != iteration && num_waiting != MAX_THREADS)
{
assert(num_waiting > 0 && num_waiting <= MAX_THREADS);
printf("-CW- TID %d, I = %d (C = %d, W = %d)\n",
tid, iteration, cycle, num_waiting);
pthread_cond_wait(&cnd_waiting, &mtx_waiting);
}
assert(cycle == iteration || num_waiting == MAX_THREADS);
printf("---- TID %d, I = %d (C = %d, W = %d)\n",
tid, iteration, cycle, num_waiting);

if (cycle != iteration)
{
gl_long = lrand48();
gl_rand = (float)gl_long;
num_waiting = 0;
cycle = iteration;
printf("---- TID %d generates cycle %d: L = %ld, F = %g\n",
tid, cycle, gl_long, gl_rand);
pthread_cond_broadcast(&cnd_waiting);
}

printf("<<-- TID %d, I = %d (C = %d, W = %d) L = %ld, F = %g\n",
tid, iteration, cycle, num_waiting, gl_long, gl_rand);
pthread_mutex_unlock(&mtx_waiting);
return gl_rand;
}

static void *thread_function(void *vp)
{
int tid = (int)(uintptr_t)vp; // Thuggish!

for (int i = 0; i < MAX_CYCLES; i++)
{
float f = next_iteration_random_number(tid, i);
printf("TID %d at work: I = %d, F = %g\n", tid, i, f);
fflush(stdout);
struct timespec rq;
rq.tv_sec = 0;
rq.tv_nsec = (((gl_long & 0xFF) + (0xF * tid))) % 200 * 50000000;
assert(rq.tv_nsec >= 0 && rq.tv_nsec < 10000000000);
nanosleep(&rq, 0);
}

return 0;
}

int main(int argc, char **argv)
{
err_setarg0(argv[0]);
assert(argc == 1);

pthread_t thread[MAX_THREADS];

for (int i = 0; i < MAX_THREADS; i++)
{
int rc = pthread_create(&thread[i], 0, thread_function, (void *)(uintptr_t)i);
if (rc != 0)
{
errno = rc;
err_syserr("failed to create thread %d", i);
}
}

for (int i = 0; i < MAX_THREADS; i++)
{
void *vp;
int rc = pthread_join(thread[i], &vp);
if (rc != 0)
{
errno = rc;
err_syserr("Failed to join TID %d", i);
}
printf("TID %d returned %p\n", i, vp);
}

return 0;
}


Source at GitHub. Library code in libsoq. The routines starting err_ are declared in header stderr.h and the supporting code is in stderr.c. These greatly simplify error reporting.

The main() function is trivial; it launches 5 threads, and then waits to join 5 threads. The only tricky bit is relaying the thread number as an argument to the thread function. The code converts the integer value into a uintptr_t, then coerces that to a void *; the called function undoes this sequence

The per-thread function is not complex. It collects the thread number from its argument, then enters into a loop to iterate through the required number of cycles. On each iteration, it passes the iteration number (and the thread number) to next_iteration_random_number(), which coordinates the random number generation. The thread prints the data, then uses nanosleep() to sleep for a not easily discernible amount of time that is under 1 second.

The interesting code is in next_iteration_random_number(). First, the thread locks the mutex that controls the shared information. It then increments the number of waiting threads. If the current cycle is not the same as iteration it is dealing with and the number of waiting threads is not equal to the total number of threads, then this thread calls pthread_cond_wait() to sleep until broadcast to. When the thread gets out of the loop (either by waking up after a broadcast, or because it never entered the loop), the thread looks to see if the current cycle is the iteration it is expecting. If not, it must be the last thread to try for this cycle — so it generates the random number and does the housekeeping to ensure that the other threads will know what is happening,
and then it uses pthread_cond_broadcast() to signal all the sleeping threads to awaken. They can't actually wake yet; the current thread still holds the mutex. It reports what it is doing and unlocks the mutex, and returns the random number. The other threads detect that the cycle number is the same as the iteration they were waiting for, so they can each print the information, unlock the mutex, and return the number.

The key point is that each thread must know which iteration it is expecting to work on, and must sleep appropriately when that is not yet the current iteration. There are assertions and printing operations aplenty in the code to help understand what's going on.

Wholly separately, since lrand48() returns a long (albeit with a value in the range [0,231), so it is a 32-bit value even when long is a 64-bit type), which means there's a very good chance that it returns values which cannot be represented accurately in a float, so the gl_rand = (float)lrand48(); code snipped from the question is dubious. (The sample code also records the long value in gl_long and uses that on occasion.)

The code compiles cleanly on a Mac running macOS Sierra 10.12.3 with GCC 6.3.0 and compilation options set to fussy:

$ gcc -O3 -g -I../../inc -std=c11 -Wall -Wextra -Werror -Wmissing-prototypes \
> -Wstrict-prototypes -Wold-style-definition pthrd37.c -o pthrd37 \
> -L../../lib -lsoq
$

Sample run

Program configured with 3 threads and 5 cycles:

-->> TID 0, I = 0 (C = -1, W = 1)
-CW- TID 0, I = 0 (C = -1, W = 1)
-->> TID 2, I = 0 (C = -1, W = 2)
-CW- TID 2, I = 0 (C = -1, W = 2)
-->> TID 1, I = 0 (C = -1, W = 3)
---- TID 1, I = 0 (C = -1, W = 3)
---- TID 1 generates cycle 0: L = 851401618, F = 8.51402e+08
<<-- TID 1, I = 0 (C = 0, W = 0) L = 851401618, F = 8.51402e+08
TID 1 at work: I = 0, F = 8.51402e+08
---- TID 0, I = 0 (C = 0, W = 0)
<<-- TID 0, I = 0 (C = 0, W = 0) L = 851401618, F = 8.51402e+08
TID 0 at work: I = 0, F = 8.51402e+08
---- TID 2, I = 0 (C = 0, W = 0)
<<-- TID 2, I = 0 (C = 0, W = 0) L = 851401618, F = 8.51402e+08
TID 2 at work: I = 0, F = 8.51402e+08
-->> TID 1, I = 1 (C = 0, W = 1)
-CW- TID 1, I = 1 (C = 0, W = 1)
-->> TID 0, I = 1 (C = 0, W = 2)
-CW- TID 0, I = 1 (C = 0, W = 2)
-->> TID 2, I = 1 (C = 0, W = 3)
---- TID 2, I = 1 (C = 0, W = 3)
---- TID 2 generates cycle 1: L = 1804928587, F = 1.80493e+09
<<-- TID 2, I = 1 (C = 1, W = 0) L = 1804928587, F = 1.80493e+09
TID 2 at work: I = 1, F = 1.80493e+09
---- TID 1, I = 1 (C = 1, W = 0)
<<-- TID 1, I = 1 (C = 1, W = 0) L = 1804928587, F = 1.80493e+09
TID 1 at work: I = 1, F = 1.80493e+09
---- TID 0, I = 1 (C = 1, W = 0)
<<-- TID 0, I = 1 (C = 1, W = 0) L = 1804928587, F = 1.80493e+09
TID 0 at work: I = 1, F = 1.80493e+09
-->> TID 2, I = 2 (C = 1, W = 1)
-CW- TID 2, I = 2 (C = 1, W = 1)
-->> TID 1, I = 2 (C = 1, W = 2)
-CW- TID 1, I = 2 (C = 1, W = 2)
-->> TID 0, I = 2 (C = 1, W = 3)
---- TID 0, I = 2 (C = 1, W = 3)
---- TID 0 generates cycle 2: L = 758783491, F = 7.58783e+08
<<-- TID 0, I = 2 (C = 2, W = 0) L = 758783491, F = 7.58783e+08
TID 0 at work: I = 2, F = 7.58783e+08
---- TID 2, I = 2 (C = 2, W = 0)
<<-- TID 2, I = 2 (C = 2, W = 0) L = 758783491, F = 7.58783e+08
TID 2 at work: I = 2, F = 7.58783e+08
---- TID 1, I = 2 (C = 2, W = 0)
<<-- TID 1, I = 2 (C = 2, W = 0) L = 758783491, F = 7.58783e+08
TID 1 at work: I = 2, F = 7.58783e+08
-->> TID 0, I = 3 (C = 2, W = 1)
-CW- TID 0, I = 3 (C = 2, W = 1)
-->> TID 2, I = 3 (C = 2, W = 2)
-CW- TID 2, I = 3 (C = 2, W = 2)
-->> TID 1, I = 3 (C = 2, W = 3)
---- TID 1, I = 3 (C = 2, W = 3)
---- TID 1 generates cycle 3: L = 959030623, F = 9.59031e+08
<<-- TID 1, I = 3 (C = 3, W = 0) L = 959030623, F = 9.59031e+08
TID 1 at work: I = 3, F = 9.59031e+08
-->> TID 1, I = 4 (C = 3, W = 1)
-CW- TID 1, I = 4 (C = 3, W = 1)
---- TID 0, I = 3 (C = 3, W = 1)
<<-- TID 0, I = 3 (C = 3, W = 1) L = 959030623, F = 9.59031e+08
TID 0 at work: I = 3, F = 9.59031e+08
---- TID 2, I = 3 (C = 3, W = 1)
<<-- TID 2, I = 3 (C = 3, W = 1) L = 959030623, F = 9.59031e+08
TID 2 at work: I = 3, F = 9.59031e+08
-->> TID 0, I = 4 (C = 3, W = 2)
-CW- TID 0, I = 4 (C = 3, W = 2)
-->> TID 2, I = 4 (C = 3, W = 3)
---- TID 2, I = 4 (C = 3, W = 3)
---- TID 2 generates cycle 4: L = 684387517, F = 6.84388e+08
<<-- TID 2, I = 4 (C = 4, W = 0) L = 684387517, F = 6.84388e+08
TID 2 at work: I = 4, F = 6.84388e+08
---- TID 1, I = 4 (C = 4, W = 0)
<<-- TID 1, I = 4 (C = 4, W = 0) L = 684387517, F = 6.84388e+08
TID 1 at work: I = 4, F = 6.84388e+08
---- TID 0, I = 4 (C = 4, W = 0)
<<-- TID 0, I = 4 (C = 4, W = 0) L = 684387517, F = 6.84388e+08
TID 0 at work: I = 4, F = 6.84388e+08
TID 0 returned 0x0
TID 1 returned 0x0
TID 2 returned 0x0

Notice that each thread happens to generate the random number at least once. That's not something that can be guaranteed, but it shows that no one thread is privileged.

pthread_once()

A first version of this answer mentioned and illustrated the pthread_once() call. This was not what was actually needed, but
it could be useful in other contexts.

Assuming you have a sufficiently POSIX-compliant system, if you want only one thread to do something, but it does not matter which thread does it, then I believe you're looking for pthread_once().

#include <pthread.h>
#include <stdlib.h>

static pthread_once_t once_only = PTHREAD_ONCE_INIT;

static float gl_rand = 0;

static void pt_once(void)
{
gl_rand = (float)lrand48();
}

void *thread_function(void *vp)
{
pthread_once(&once_only, pt_once);
…rest of the code…
return 0;
}

std::promise::set_value on FIFO thread pinned to a core doesn't wake std::future

I have reran the tests on a SLES 11 / SP 2 box, and the pinning works.

As such, I'm going to mark this as an answer, which is: This is an issue related to SP 1

Using pthread mutex shared between processes correctly

Am I right that the pthread_mutex_init doesn't provide any safe approach to initialize the pthread_mutex_t simultaneously from different processes?

Correct. It is up to you to ensure that only one process calls pthread_mutex_init() on the mutex, and that no process tries to operate on the mutex until that call has successfully returned.

For example, with POSIX shm_open() shared memory regions, you can have the processes try to open the region with the O_CREAT and O_EXCL flags, so that exactly one process will succeed in creating it. This process is then responsible for resizing the shared memory region and initialising the mutex with pthread_mutex_init(). The other processes must then wait for some kind of notification from the initialising process before opening the shared memory region - eg you could have the processes block opening a FIFO O_RDONLY, and have the initialising process notify them by opening the FIFO O_WRONLY (which will cause the open to succeed).

Usually, a shared memory segment will not be the only communication channel between the processes. Typically you would bootstrap the communication through a UNIX domain socket and negotiate the setup of the shared memory region over it, probably even passing the shared memory region file descriptor through the socket with a SCM_RIGHTS message. The shared memory region would then be used to accelerate the performance-sensitive IPC.

pthreads: how to assert code is run in a single threaded context

You could mark your library initialization function to be run prior to the application main(). For example, using GCC,

static void my_lib_init(void) __attribute__((constructor));

static void my_lib_init(void)
{
/* ... */
}

Another option is to use posix_spawn() to fork and execute the worker processes as separate, slave binaries.

EDITED TO ADD:

It seems to me that if you wish to determine if the process has already created (actual, kernel-based) threads, you will have to rely on OS-specific code.

In the Linux case, the determination is simple, and safe to run on other OSes too. If it cannot determine the number of threads used by the current process, the function will return -1:

#include <unistd.h>
#include <sys/types.h>
#include <dirent.h>
#include <errno.h>

int count_threads_linux(void)
{
DIR *dir;
struct dirent *ent;
int count = 0;

dir = opendir("/proc/self/task/");
if (!dir)
return -1;

while (1) {

errno = 0;
ent = readdir(dir);
if (!ent)
break;

if (ent->d_name[0] != '.')
count++;
}

if (errno) {
const int saved_errno = errno;
closedir(dir);
errno = saved_errno;
return -1;
}

if (closedir(dir))
return -1;

return count;
}

There are certain cases (like chroot without /proc/) when that check will fail even in Linux, so the -1 return value should always be treated as unknown rather than error (although errno will indicate the actual reason for the failure).

Looking at the FreeBSD man pages, I wonder if the corresponding information is available at all.

Finally:

Rather than try detecting the problematic case, I seriously recommend you fork() and exec() (or posix_spawn()) the slave processes, using only async-signal-safe functions (see man 7 signal) in the child process (prior to exec()), thus avoiding the fork()-thread complications. You can still create any shared memory segments, socket pairs, et cetera before forking(). The only drawback I can see is that you have to use separate binaries for the slave workers. Which, given your description of them, does not sound like a drawback to me.



Related Topics



Leave a reply



Submit