Creating a Counter That Stays Synchronized Across Mpi Processes

Creating a counter that stays synchronized across MPI processes

Implementing a shared counter isn't trivial, but once you do it and have it in a library somewhere you can do a lot with it.

In the Using MPI-2 book, which you should have to hand if you're going to implement this stuff, one of the examples (the code is available online) is a shared counter. The "non-scalable" one should work well out to several dozens of processes -- the counter is an array of 0..size-1 of integers, one per rank, and then the `get next work item #' operation consists of locking the window, reading everyone elses' contribution to the counter (in this case, how many items they've taken), updating your own (++), closing the window, and calculating the total. This is all done with passive one-sided operations. (The better-scaling one just uses a tree rather than a 1-d array).

So the use would be you have say rank 0 host the counter, and everyone keeps doing work units and updating the counter to get the next one until there's no more work; then you wait at a barrier or something and finalize.

Once you have something like this - using a shared value to get the next work unit available - working, then you can generalize to more sophisticated approach. So as suzterpatt suggested, everyone taking "their share" of work units at the start works great, but what to do if some finish faster than others? The usual answer now is work-stealing; everyone keeps their list of work units in a dequeue, and then when one runs out of work, it steals work units from the other end of someone elses dequeue, until there's no more work left. This is really the completely-distributed version of master-worker, where there's no more single master partitioning work. Once you have a single shared counter working, you can make mutexes from those, and from that you can implement the dequeue. But if the simple shared-counter works well enough, you may not need to go there.

Update: Ok, so here's a hacky-attempt at doing the shared counter - my version of the simple one in the MPI-2 book: seems to work, but I wouldn't say anything much stronger than that (haven't played with this stuff for a long time). There's a simple counter implementation (corresponding to the non-scaling version in the MPI-2 book) with two simple tests, one corresponding roughly to your work case; each item updates the counter to get a work item, then does the "work" (sleeps for random amount of time). At the end of each test, the counter data structure is printed out, which is the # of increments each rank has done.

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
MPI_Win win;
int hostrank ;
int myval;
int *data;
int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
struct mpi_counter_t *count;

count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
count->hostrank = hostrank;
MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

if (count->rank == hostrank) {
MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
for (int i=0; i<count->size; i++) count->data[i] = 0;
MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
} else {
count->data = NULL;
MPI_Win_create(count->data, 0, 1,
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
count -> myval = 0;

return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;

MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

for (int i=0; i<count->size; i++) {

if (i == count->rank) {
MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
count->win);
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}

MPI_Win_unlock(0, count->win);
count->myval += increment;

vals[count->rank] = count->myval;
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];

free(vals);
return val;
}

void delete_counter(struct mpi_counter_t **count) {
if ((*count)->rank == (*count)->hostrank) {
MPI_Free_mem((*count)->data);
}
MPI_Win_free(&((*count)->win));
free((*count));
*count = NULL;

return;
}

void print_counter(struct mpi_counter_t *count) {
if (count->rank == count->hostrank) {
for (int i=0; i<count->size; i++) {
printf("%2d ", count->data[i]);
}
puts("");
}
}

int test1() {
struct mpi_counter_t *c;
int rank;
int result;

c = create_counter(0);

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
result = increment_counter(c, 1);
printf("%d got counter %d\n", rank, result);

MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
}

int test2() {
const int WORKITEMS=50;

struct mpi_counter_t *c;
int rank;
int result = 0;

c = create_counter(0);

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srandom(rank);

while (result < WORKITEMS) {
result = increment_counter(c, 1);
if (result <= WORKITEMS) {
printf("%d working on item %d...\n", rank, result);
sleep(random() % 10);
} else {
printf("%d done\n", rank);
}
}

MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
}

int main(int argc, char **argv) {

MPI_Init(&argc, &argv);

test1();
test2();

MPI_Finalize();
}

MPI - Shared queue across processes

There are two solutions I would recommend.

The first as Gilles points out is to use MPI_ANY_SOURCE to receive 1000 completion messages which can be sent from any of the workers.

The second is to use MPI_ACCUMULATE. In this case, the master node shares a window which is initialized to 0, then each worker uses MPI_ACCUMULATE to increment the value in the window after each task is completed. The master polls it's own local window until it reaches 1000.

In this case I'd stick to MPI_ANY_SOURCE rather than mess with creating and destroying windows. I don't think there is a compelling reason to add that complexity here.

Reproducing some sort of shared memory in MPI

You won't be able to do this with the MPI-1 APIs you suggest above. However, MPI-2 allows for "remote memory operations" which allow you to do such a thing. I answered a very similar question here, based on the MPI-2 book and its online examples:
Creating a counter that stays synchronized across MPI processes There, only the "counter increment" is implemented. It doesn't do the broadcast; but do you really need such an operation? Wouldn't it be enough for the other tasks just to check out the value of the counter whenever it's needed?



Related Topics



Leave a reply



Submit