How to Increment a Shared Counter from Multiple Processes

How to increment a shared counter from multiple processes?

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
''' store the counter for later use '''
global counter
counter = args

def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10

if __name__ == '__main__':
#inputs = os.listdir(some_directory)

#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]

#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()

Sharing a counter with multiprocessing.Pool

The RuntimeError you get when using Pool is because arguments for pool-methods are pickled before being send over a (pool-internal) queue to the worker processes.
Which pool-method you are trying to use is irrelevant here. This doesn't happen when you just use Process because there is no queue involved. You can reproduce the error just with pickle.dumps(multiprocessing.Value('i', 0)).

Your last code snippet doesn't work how you think it works. You are not sharing a Value, you are recreating independent counters for every child process.

In case you were on Unix and used the default start-method "fork", you would be done with just not passing the shared objects as arguments into the pool-methods.
Your child-processes would inherit the globals through forking. With process-start-methods "spawn" (default Windows and macOS with Python 3.8+) or "forkserver", you'll have to use the initializer during Pool
instantiation, to let the child-processes inherit the shared objects.

Note, you don't need an extra multiprocessing.Lock here, because multiprocessing.Value comes by default with an internal one you can use.

import os
from multiprocessing import Pool, Value #, set_start_method

def func(x):
for i in range(x):
assert i == i
with cnt.get_lock():
cnt.value += 1
print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')

def init_globals(counter):
global cnt
cnt = counter

if __name__ == '__main__':

# set_start_method('spawn')

cnt = Value('i', 0)
iterable = [10000 for _ in range(10)]

with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
pool.map(func, iterable)

assert cnt.value == 100000

Probably worth noting as well is that you don't need the counter to be shared in all cases.
If you just need to keep track of how often something has happened in total, an option would be to keep separate worker-local counters during computation which you sum up at the end.
This could result in a significant performance improvement for frequent counter updates for which you don't need synchronization during the parallel computation itself.

How to increment counter in different process python?

You need to explicitly share the memory to make it work properly:

from multiprocessing import Process, Lock, Value

start = Value('I',0)

def printer(item):
"""
Prints out the item that was passed in
"""
with start.get_lock():
start.value+=1
print(start.value)

Note the multiprocessing Value wrapper comes with its own lock.

Python multprocessing: Increment values of shared variables across processes

mp.Lock() create lock object but it does not lock the block of code. To lock critical region of code you should first call acquire method of mp.Lock object and call release method of the object after the critical region.


INIT_NODE = mp.Value('i', 1000)
INIT_WAY = mp.Value('i', 1000)
INIT_REL = mp.Value('i', 1000)
lock = mp.Lock()

def process_results(a):
lock.acquire()

INIT_NODE.value += 20000000
INIT_WAY.value += 10000000
INIT_REL.value += 1000000
print_locked(INIT_NODE, INIT_WAY, INIT_REL)
lock.release()

Now the output of this program is the same for all run

20001000 10001000 1001000
40001000 20001000 2001000
60001000 30001000 3001000
80001000 40001000 4001000
100001000 50001000 5001000

Python multiprocessing - shared counter in global list is not incrementing properly

More specific than my comment, your problem is that you're fundamentally misunderstanding what multiprocessing is doing, which produces faulty expectations for your output. You can't declare a global variable and then share it across multiple processes. You can get away with a little bit more when you use Threads, but in order to understand why you're having trouble, you need to realize what multiprocessing is doing.

When Pool.map() fires up your child processes, each of them launches its own python interpreter where it imports your top level function process_item. This separate interpreter instance also creates its own instance of list_global. That happens for every child process. Your calls to global don't just magically make those separate running processes share a list defined in your module.

Atomically incrementing an integer in shared memory for multiple processes on linux x86-64 with gcc

Using either an int or std::atomic works.

One of the great things about the std::atomic interface is that it plays quite nicely with the int "interface". So, the code is almost exactly the same. One can switch between each implementation below by adding a #define USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER true.

I'm not so sure about statically creating the std::atomic in shared memory, so I use placement new to allocate it. My guess is that relying on the static allocation would work, but it may technically be undefined behavior. Figuring that out is beyond the scope of my question, but a comment on that topic would be quite welcome.

signaling_incrementing_counter.h

#include <atomic>
#include "gpu_base_constants.h"

struct SignalingIncrementingCounter {
public:
/**
* We will either count up or count down to the given limit. Once the limit is reached, whatever is waiting on this counter will be signaled and allowed to proceed.
*/
void init(const int upper_limit_);
void reset_to_empty();
void increment(); // only valid when counting up
void block_until_full(const char * comment = {""});
// We don't have a use-case for the block_until_non_full

private:

int upper_limit;

#if USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
volatile int value;
#else // USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
std::atomic<int> value;
std::atomic<int> * value_ptr;
#endif // USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER

pthread_mutex_t mutex;
pthread_cond_t cv;

};

signaling_incrementing_counter.cpp

#include <pthread.h>
#include <stdexcept>

#include "signaling_incrementing_counter.h"

void SignalingIncrementingCounter::init(const int upper_limit_) {

upper_limit = upper_limit_;
#if !GPU_USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
value_ptr = new(&value) std::atomic<int>(0);
#endif // GPU_USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
int retval = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
if (retval) {
throw std::runtime_error("Error while setting sharedp field for mutex");
}
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);

pthread_mutex_init(&mutex, &attr);
pthread_mutexattr_destroy(&attr);
}

{
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);

pthread_cond_init(&cv, &attr);
pthread_condattr_destroy(&attr);
}

reset_to_empty(); // should be done at end, since mutex functions are called
}

void SignalingIncrementingCounter::reset_to_empty() {
int mutex_rv = pthread_mutex_lock(&mutex);
if (mutex_rv) {
throw std::runtime_error("Unexpected error encountered while grabbing lock. Investigate.");
}
value = 0;
// No need to signal, because there is no function that unblocks when the value changes to 0
pthread_mutex_unlock(&mutex);
}

void SignalingIncrementingCounter::increment() {
fprintf(stderr, "incrementing\n");
int mutex_rv = pthread_mutex_lock(&mutex);
if (mutex_rv) {
throw std::runtime_error("Unexpected error encountered while grabbing lock. Investigate.");
}
++value;
fprintf(stderr, "incremented\n");

if (value >= upper_limit) {
pthread_cond_broadcast(&cv);
}
pthread_mutex_unlock(&mutex);
}

void SignalingIncrementingCounter::block_until_full(const char * comment) {
struct timespec max_wait = {0, 0};
int mutex_rv = pthread_mutex_lock(&mutex);
if (mutex_rv) {
throw std::runtime_error("Unexpected error encountered while grabbing lock. Investigate.");
}
while (value < upper_limit) {
int val = value;
printf("blocking during increment until full, value is %i, for %s\n", val, comment);
/*const int gettime_rv =*/ clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 5;
const int timed_wait_rv = pthread_cond_timedwait(&cv, &mutex, &max_wait);
if (timed_wait_rv)
{
switch(timed_wait_rv) {
case ETIMEDOUT:
break;
default:
pthread_mutex_unlock(&mutex);
throw std::runtime_error("Unexpected error encountered. Investigate.");
}
}
}
pthread_mutex_unlock(&mutex);
}

Python using multiprocess to speed up merging counters

The call combined = future.result() blocks until the result is completed so you are not submitting a subsequent request to the pool until the previous request completes. In other words, you never have more than one subprocess running. At the very least you should change your code to:

with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = []
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures.append(future) # save it
results = [f.result() for f in the_futures()] # all the results

Another way:

with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = []
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures.append(future) # save it
# you need: from concurrent.futures import as_completed
for future in as_completed(the_futures): # not necessarily the order of submission
result = future.result() # do something with this

Also, if you do not specify max_workers to the ProcessPoolExecutor constructor, it defaults to the number of processors on your machine. There is nothing to be gained by specifying a value greater than the number of processors that you actually have.

Update

If you want to process the results as soon as they are completed and need a way to tie a result back to the original request, you can store the futures as keys in a dictionary where the corresponding values represent the requests' arguments. In this case:

with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = {}
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures[future] = samples # map future to request
# you need: from concurrent.futures import as_completed
for future in as_completed(the_futures): # not necessarily the order of submission
samples = the_futures[future] # the request
result = future.result() # the result


Related Topics



Leave a reply



Submit