Combine Pool.Map with Shared Memory Array in Python Multiprocessing

Combine Pool.map with shared memory Array in Python multiprocessing

Trying again as I just saw the bounty ;)

Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count

if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)

# fork
pool = Pool()

# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData

print pool.map( count_it, ["a", "b", "s", "d"] )

[EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count

def initProcess(share):
mymodule.toShare = share

if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)

# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))

# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData

print pool.map( count_it, ["a", "b", "s", "d"] )

Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.

Passing shared memory variables in python multiprocessing

When I define arr = np.zeros(4), which processor owns this variable?

Only the main process should have access to this. If you use "fork" for the start method, everything will be accessible to the child process, but as soon as something tries to modify it, it will be copied to it's own private memory space before being modified (copy on write). This reduces overhead if you have large read-only arrays, but doesn't help you much for writing data back to those arrays.

what is being sent if this variable is not defined on those processors.

A new array is created within the child process when the arguments are re-constructed after being sent from the main process via a pipe and pickle. The data is serialized to text and re-constructed, so no information other than the value of the data in the slice remains. it's a totally new object.

Why doesn't example 2 work while example 3 does?

example 3 works because at the time of "fork" (the moment you call Pool), arr has already been created, and will be shared. It's also important that you used an Array to create it, so when you attempt to modify the data, the data is shared (the exact mechanics of this are complicated).

example 2 does not work in a similar way that example 1 does not work: you pass a slice of an array as an argument, which gets converted into a totally new object, so arr inside your do_stuff function is just a copy of arr[i:i+1] from the main process. It is still important to create anything which will be shared between processes before calling Pool (if you're relying on "fork" to share the data), but that's not why this example doesn't work.

You should know: example 3 only works because you're on linux, and the default start method is fork. This is not the preferred start method due to the possibility of deadlocks with copying lock objects in a locked state. This will not work on Windows at all, and won't work on MacOS by default on 3.8 and above.

The best solution (most portable) to all this is to pass the Array itself as the argument, and re-construct the numpy array inside the child process. This has the complication that "shared objects" can only be passed as arguments at the creation of the child process. This isn't as big a deal if you use Process, but with Pool, you basically have to pass any shared objects as arguments to an initialization function, and get the re-constructed array as a global variable of the child's scope. In this example for instance you will get an error trying to pass buf as an argument with p.map or p.apply, but not when passing buf as initargs=(buf,) to Pool()

import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
global arr #use global context (for each process) to pass arr to do_stuff
arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
global arr
arr[i]=i

if __name__ == '__main__':
idx = [0,1,2,3]

buf = Array('d', 4)
arr = np.frombuffer(buf.get_obj(), dtype='d')
arr[:] = 0

#"with" context is easier than writing "close" and "join" all the time
with Pool(4, initializer=init_child, initargs=(buf,)) as p:
for i in idx:
p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
print(arr)

with 3.8 and above there's a new module which is better than Array or any of the other sharedctypes classes called: shared_memory. This is a bit more complicated to use, and has some additional OS dependent nastiness, but it's theoretically lower overhead and faster. If you want to go down the rabbit hole I've written a few answers on the topic of shared_memory, and have recently been answering lots of questions on concurrency in general if you want to take a gander at my answers from the last month or two.

python pool.map() and shared array variable

What you are doing is incorrect. It would work using multiple threads, since they share memory, but multiple processes do not share memory! The memory is copied to the child processes so each one has its own copy of the "shared" array and the main process has the original copy which is never touched.

The multiprocessing module does have some facilities to handle shared state however be aware that shared objects are way slower than normal python objects. It's often easier instead to make the child processes completely independent and let the main process recombine the outputs in the end.

In your case you can use the Manager object to instantiate a shared list:

import multiprocessing

class MyClass():

shared = []
l = multiprocessing.Lock()

def __init__(self):
with multiprocessing.Manager() as manager, multiprocessing.Pool(processes=2) as pool:
self.shared = manager.list([])
pool.map(self.mythread, range(1,10))

print(self.shared)

def mythread(self, param):
print(param)
self.l.acquire()
self.shared.append(param)
print(self.shared)
self.l.release()

if __name__ == "__main__":
instance = MyClass()

But this is probably going to be faster:

import multiprocessing

class MyClass():

def __init__(self):
with multiprocessing.Pool(processes=2) as pool:
results = pool.map(self.mythread, range(1,10))
final_result = [x for res in results for x in res]
print(final_result)

def mythread(self, param):
print(param)
# assume that this might return more than one value
return [param, ...]

if __name__ == "__main__":
instance = MyClass()

BTW: in the case where the functin applied with map has a result that is independent from the other subprocesses and it only have one result then simply using map is going to return the list of values you want.

Multiprocessing in Python: Pool and Process with shared array

np.linalg.solve should already be executed in parallel function implemented in LAPACK. In fact, this is the case on my (Linux + Windows) machine. Indeed, it calls LAPACK functions like dtrsm and dlaswp and the main computational function, dgemm, implemented in BLAS libraries. This last function should take >90% of the time and is heavily optimized and parallelized as long as you use a fast BLAS implementation. Numpy use OpenBLAS by default on most systems which is very good (and parallel). The Intel MKL is also a good alternative supporting LAPACK (certainly better on Intel hardware). If the computation is not parallel on your machine, this is a problem and you should check your BLAS implementation as it may be very inefficient.

The thing is parallelizing a code already parallel make it slower because running more threads than available core put a lot of pressure on the OS scheduler and the BLAS functions are not optimized for such a case. More specifically, profiling results shows parallelizing a parallel OpenBLAS function cause some synchronization function to wait for a while because of the work imbalance (certainly due to a static schedule of the computing kernels). This is the main source of slowdown of the approach 2/3/4 compared to the first sequential approach.

If you really want to parallelize the function you need to configure the BLAS so to use 1 thread (with OMP_NUM_THREADS=1 on OpenBLAS) but this is likely be less efficient than letting the BLAS does the parallelization. Indeed, BLAS makes use of optimized multi-threaded kernels (working in shared memory) meanwhile Python nearly prevent such design to be fast because of the global interpreter lock (GIL) in multi-threaded codes. Multi-threading is also limited by the overhead of pickling or the one of forking. That being said such overheads are small in the approach 2 and 3 (not in 4 certainly due to the queue). This generally is why Python is often not great for writing fast parallel applications (unless for long-lasting embarrassingly parallel works with small data transfers).

Here are average timing on my 6-core i5-9600KF machine with OpenBLAS:

Default OpenBLAS configuration:
- Approach 1: 0.129 seconds (stable)
- Approach 2: 1.623 seconds (highly-variable)
- Approach 3: 1.407 seconds (highly-variable)
- Approach 4: 1.532 seconds (highly-variable)

Sequential configuration of OpenBLAS:
- Approach 1: 0.338 seconds (stable) <--- Now sequential
- Approach 2: 0.152 seconds (quite stable)
- Approach 3: 0.151 seconds (stable)
- Approach 4: 0.177 seconds (stable) <--- >10% time in CPython overheads
- Parallel Numba: 0.144 seconds (stable) <--- Multi-threaded

Note that the best speed up is pretty bad (ie. ~2.6) on a 6-core machine. I guess this might be because the computation is memory-bound (on my PC, the RAM throughput is about 20 GiB/s, while it can reach 32-35 GiB/s at most for such access pattern).

Pool().map: modify the shared object

If your startmethod is spawn or forkserver, then A is not a shared object in the first place. And if you're on Windows, spawn is the default, and only choice.

If your startmethod is fork, then A may be a shared object—but if it is, it isn't actually safe to mutate it without any locks.

As explained in Sharing state between processes, you should try as hard as possible to not need shared objects—it's kind of the whole point of multiprocessing that the processes are isolated from each other—but if you really do need them, you have to do something a bit more complicated.

The first option is using shared memory. In this case, you're using your list as a fixed-sized array of small ints, which you can simulate with an Array('i', [1, 2]), which you can use exactly as in the example in the docs. For more complicated cases, you often need to add a Lock or other synchronization mechanism to protect the shared memory. This is pretty efficient and simple, but it only works when your shared data is something that can be mapped to low-level types like this.

The second option is using a Manager.list([1, 2]), which you can use exactly as in the very next example in the docs. This is a lot less efficient—it works by creating a queue and passing messages back and forth that tell the main process to do the work whenever you want to access or mutate the list—but it has the advantage of being dead simple to use.


But again, it's usually better to not do either of these things, and instead rewrite your code to not need shared data in the first place. Usually this means returning more data from the pool tasks, and then having the main process assemble the returned values in some way. Of course this is tricky if, e.g., other tasks inherently need to see the mutated values. (In such cases, you'd often have to build 80% of what Manager is doing, at which point you might as well just use Manager…). But in your toy example, that isn't the case. (And, in fact, when you think that's unavoidably necessary, it often means you haven't thought through how nondeterminism is going to affect your algorithm, and it wouldn't have worked anyway…)

Here's an example of how you could do this with your toy problem:

import multiprocessing

def square(i, aval):
# actual return value, i, and value to set A[i] to
return i*i, i, 2+aval

A = [1, 2]
# pass each A[i] into the function
for result, i, aval in multiprocessing.Pool().starmap(square, zip([0, 1], A)):
# get the new A[i] out of the function and store it
A[i] = aval
print(A)

multiprocessing.Pool sharing large lists of lists read-only in memory across child process

It seems to work fine for me using mp.Manager, with an mp.Manager.list of mp.Manager.lists. I believe this will not copy the lists to every process.

The important line is:

big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])

You may want to use instead, depending on your use case:

big_list_of_lists_proxy = manager.list(big_list_of_lists)

Whether every sublist should be a proxy or not depends on whether each sublist is large, and also whether it is read in its entirety. If a sublist is large, then it is expensive to transfer the list object to each process that needs it (O(n) complexity) and so a proxy list from a manager should be used, however if every element is going to be needed anyway, there is no advantage to using a proxy.

import multiprocessing as mp
from operator import itemgetter
import numpy as np
from functools import partial

def foo(indexes, big_list_of_lists):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])

def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
with ctx.Manager() as manager:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
# big_list_of_lists elements are also passed as args
pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
pool.close()
pool.join()

return res

if __name__ == '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]

Python multiprocessing - Reading from / writing to multiple 2D arrays

So after many failures and countless tries, I came up with this, which seems to work:

import time
import numpy as np
from multiprocessing import Pool, RawArray, cpu_count

# A global dictionary storing the variables passed from the initializer.
GLOBAL_DICT = {}

def init_worker(input_array, array_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
GLOBAL_DICT['input_array'] = input_array
GLOBAL_DICT['array_shape'] = array_shape

def worker_func(i):
# Simply doubles all entries
data = np.frombuffer(GLOBAL_DICT['input_array']).reshape(GLOBAL_DICT['array_shape'])
return data[i, :] * 2

# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
start = time.time()
my_array = np.array(
[[1, 1, 2, 2],
[1, 1, 2, 2],
[3, 3, 4, 4],
[3, 3, 4, 4]])
array_shape = my_array.shape
raw_array = RawArray('d', array_shape[0] * array_shape[1])
# Wrap my_array as an numpy array.
shared_array = np.frombuffer(raw_array).reshape(array_shape)
# Copy my_array to shared array.
np.copyto(shared_array, my_array)
# Start the process pool and do the computation.
args = (raw_array, array_shape)
with Pool(processes=cpu_count()-2, initializer=init_worker, initargs=args) as pool:
result = pool.map(worker_func, range(array_shape[0]))
print(f'Input:\n{my_array}')
print(f'Result:\n{np.array(result)}')

Prints:

Input:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Result:
[[2. 2. 4. 4.]
[2. 2. 4. 4.]
[6. 6. 8. 8.]
[6. 6. 8. 8.]]

I guess there are more efficient or prettier ways to do that. Ideally, I'd like to write directly to a shared output array, but for now, it works.



Related Topics



Leave a reply



Submit