C++ Thread, Shared Data

Passing Data between thread using C issue

a sample program taken from https://computing.llnl.gov/tutorials/pthreads/#Mutexes and modified. This shows, how to use a globally declared data in multiple threads.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

/*
The following structure contains the necessary information
to allow the function "dotprod" to access its input data and
place its output into the structure.
*/

typedef struct
{
double *a;
double *b;
double sum;
int veclen;
} DOTDATA;

/* Define globally accessible variables and a mutex */

#define NUMTHRDS 4
#define VECLEN 100

DOTDATA dotstr; //GLOBAL DATA which is going to be accessed by different threads

pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum;

void *dotprod(void *arg)
{

/* Define and use local variables for convenience */

int i, start, end, len ;
long offset;
double mysum, *x, *y;
offset = (long)arg;

len = dotstr.veclen;
start = offset*len;
end = start + len;
x = dotstr.a;
y = dotstr.b;

/*
Perform the dot product and assign result
to the appropriate variable in the structure.
*/

mysum = 0;
for (i=start; i<end ; i++)
{
mysum += (x[i] * y[i]);
}

/*
Lock a mutex prior to updating the value in the shared
structure, and unlock it upon updating.
*/
pthread_mutex_lock (&mutexsum);
dotstr.sum += mysum;
pthread_mutex_unlock (&mutexsum);

pthread_exit((void*) 0);
}

int main (int argc, char *argv[])
{
long i;
double *a, *b;
void *status;

/* Assign storage and initialize values */
a = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double));
b = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double));

for (i=0; i<VECLEN*NUMTHRDS; i++)
{
a[i]=1.0;
b[i]=a[i];
}

dotstr.veclen = VECLEN;
dotstr.a = a;
dotstr.b = b;
dotstr.sum=0;

pthread_mutex_init(&mutexsum, NULL);

for(i=0; i<NUMTHRDS; i++)
{
/*
Each thread works on a different set of data.
The offset is specified by 'i'. The size of
the data for each thread is indicated by VECLEN.
*/
pthread_create(&callThd[i], NULL, dotprod, (void *)i);
}

/* Wait on the other threads */
for(i=0; i<NUMTHRDS; i++)
{
pthread_join(callThd[i], &status);
}

printf ("Sum = %f \n", dotstr.sum);
free (a);
free (b);
pthread_mutex_destroy(&mutexsum);
pthread_exit(NULL);//No need of pthread_join() if pthread_exit() used.
}

Sharing Data among pthreads in C

The global table can be split into subsets of equal sizes assigned to each threads. They receive the size of the subset, the offset of the subset in the global vector table and they will return a computation result. So, defining a structure to pass those parameters to the threads is a solution:

// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
};

The threads must start at the same time otherwise some threads may finish before others are created. Using a barrier to make the thread start the computing at the "same time" is a solution. The main program initializes the barrier with the number of involved threads (this includes the main thread):

// Thread synchronization
pthread_barrier_t barrier;
[...]
// Initialize the barrier (number of secondary threads + main thread)
rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
if (rc != 0) {
errno = rc;
perror("pthread_barrier_init()");
return 1;
}
[...]
// Synchronize with the threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return 1;
}

As you said that you are a beginner with the pthreads, note that upon error, the functions do not set errno but return the error code. Hence, to use errno in the error messages, don't forget to set errno with the return code of the failing pthread function. For example:

rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}

Then, the program can take as parameters, the number of computing threads and the vector's values:

  if (ac < 3) {
fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
return 1;
}

Here is an example of source code for the program:

/*

Compute the Euclidean norm (https://en.wikipedia.org/wiki/Norm_(mathematics))

*/
#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <libgen.h>

// Vector table
float *vec;

// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
};

// Thread synchronization
pthread_barrier_t barrier;

void *th_entry(void *arg)
{
struct th_param *param = (struct th_param *)arg;
int i;
int rc;

param->result = 0;

// Synchronize with the other threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return NULL;
}

for (i = 0; i < param->nb; i ++) {
param->result += vec[param->offset + i] * vec[param->offset + i];
}

return NULL;
}

int main(int ac, char *av[])
{
int i;
size_t nb_vec;
int nb_threads;
struct th_param *th_table;
int rc;
double result;

if (ac < 3) {
fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
return 1;
}

nb_threads = atoi(av[1]);

if (nb_threads <= 0) {
fprintf(stderr, "Bad number of threads\n");
return 1;
}

nb_vec = ac - 2;

if (nb_threads > nb_vec) {
fprintf(stderr, "Too many threads\n");
return 1;
}

// Allocate the vector table
vec = (float *)malloc(nb_vec * sizeof(float));
if (!vec) {
perror("malloc");
return 1;
}

// Allocate the per-thread parameters
th_table = (struct th_param *)malloc(nb_threads * sizeof(struct th_param));
if (!th_table) {
perror("malloc");
return 1;
}

// Populate the vector table
for (i = 0; i < nb_vec; i ++) {

vec[i] = strtof(av[i + 2], NULL);

}

// Initialize the barrier (number of secondary threads + main thread)
rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
if (rc != 0) {
errno = rc;
perror("pthread_barrier_init()");
return 1;
}

// Create the threads
for (i = 0; i < (nb_threads - 1); i ++) {

th_table[i].offset = i * (nb_vec / nb_threads);
th_table[i].nb = nb_vec / nb_threads;

printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);

rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}

}

// The last thread may have less/more slots
th_table[i].offset = i * (nb_vec / nb_threads);
th_table[i].nb = nb_vec / nb_threads;
th_table[i].nb = nb_vec - (i * th_table[i].nb);

printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);

rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}

result = 0;

// Synchronize with the threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return 1;
}

for (i = 0; i < nb_threads; i ++) {

rc = pthread_join(th_table[i].tid, NULL);
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}

result += th_table[i].result;

}

result = sqrt(result);

printf("Result=%f\n", result);

return 0;

}

Built it:

$ gcc euclidean_norm.c -o euclidean_norm -lpthread -lm

Run it:

$ ./euclidean_norm
Usage: euclidean_norm nb_threads x1 x2 x3...
$ ./euclidean_norm 1 1 2 3
Thread#0, offset=0, nb=3
Result=3.741657
$ ./euclidean_norm 2 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=2
Result=3.741657
$ ./euclidean_norm 3 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=1
Thread#2, offset=2, nb=1
Result=3.741657

To measure the time duration, it is possible to use time command or for finer granularity functions like gettimeofday()...

You may also make the threads run on separate CPU cores by passing CPU affinity attributes at thread creation time as second parameter of pthread_create (cf. pthread_attr_init) or pass the CPU core number in the parameters and call pthread_set_affinity_np in the thread's entry point.

You may also consider setting the scheduling policy/priority of the threads with pthread_setschedparam or the aforementioned thread's attributes.

When implementing data structures for multi-threaded applications, there is a common pitfall consisting to forget or underestimate the impact of the false sharing on the performances. That is why data structure alignments on cache line sizes is also an important concern (e.g. gcc provides the aligned attribute). In the application example above, when a thread writes its results into the parameter structure, it may trigger false sharing with the other threads if contiguous parameter structures share the same cache lines. To solve this with gcc, we can use the aligned attribute to make each entry of the table of parameters be on a separate cache line. One way to get the cache line size if to look at /proc/cpuinfo:

$ cat /proc/cpuinfo | grep cache_alignment
cache_alignment : 64
[...]

The structure could be redefined as:

// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
} __attribute__ ((aligned (64)));

pthreads shared memory between threads

If you pass the address of i to all functions, and they each try to modify it, well of course i gets messed up because they all have the address of the same variable. What you need is to give each thread the range they need to work on and let them iterate over it with a local variable. Something like this:

struct thread_data
{
int start;
int end;
pthread_t tid;
};

struct thread_data td[N]; // assuming N threads

/* divide up the ranges */
for (i = 0; i < N; ++i)
td[i] = (struct thread_data){ .start = i*len/N, .end = (i+1)*len/N };
td[N-1].end = len;

/* create the threads */
for (i = 0; i < N; ++i)
pthread_create(&td[i].tid, NULL, func, &td[i]);

and in func:

void *func(void *arg)
{
struct thread_data *data = arg;

for (int i = data->start; i < data->end; ++i)
/* do some work */
}

You may also be interested in learning about OpenMP which is designed to automate exactly what you are asking for.

Memory sharing between threads in C

You've actually asked two separate questions.

what memory is being shared between threads?

Well, all memory (on typical OSes). A main difference between threads and processes is that different processes have different memory spaces, which threads within a process have the same memory space.

See also:

What is the difference between a process and a thread?

will [the two threads] share the same value for MyVariable?

No! and that's because each thread has its own stack (and their own registers state). Now, the stacks are both in the shared memory space, but each thread uses a different one.

So: Sharing memory space is not the same as sharing the value of each variable.

Sharing generic data between threads

This is a producer-consumer scenario, and my preference is to implement a blocking queue.

The event handler pushes messages onto the queue. The worker thread blocks on the queue when it is empty, and wakes up when there is something to process.

The messages can be allocated and freed, but it's usually better if they are drawn from a free list. Messages are simply a union of types, with a type field and an id in common. Nice and simple.

Extending to multiple worker threads and/or new message types is straightforward.

You can easily find pre-written code for a blocking queue of this kind in your language of choice.

[There may be something about your constraints that don't fit this approach, but if so please edit your question to make it clear.]


There are three basic strategies for a variable length blocking queue in C++. Assume that a message is a union of different types of different sizes.

  1. The messages on the queue are fixed in size and contain a pointer to a message. Each message is allocated and freed as needed.
  2. The messages on the queue are fixed in size and contain a pointer to a message. Messages are kept in pools (free lists/arrays) to avoid allocation overhead.
  3. The messages on the queue are variable in size. They consist of a stream of bytes pushed onto the queue, with a length to know how much to pop off.

You can easily find sample code and full working examples by searching for "blocking queue variable length C++" or similar.

Do threads share local variables?

Threads usually share the following.

  1. Data Segment(global variables,static data)
  2. Address space.
  3. Code Segment.
  4. I/O, if file is open , all threads can read/write to it.
  5. Process id of parent.
  6. The Heap

But threads maintain their own copy of stack,and local variables are stored on the stack so yeah you are right that each thread should have its own copy of local variables.

May be its a bad terminology used or may be its something specific to problem set.

Realtime data sharing in between threads

You can set up one or multiple atomic queues from the sensor threads to the consumer(s). This way you don't have to do any locking on your own.

For example, a queue from Intel TBB.



Related Topics



Leave a reply



Submit