Use Numpy Array in Shared Memory For Multiprocessing

Passing shared memory variables in python multiprocessing

When I define arr = np.zeros(4), which processor owns this variable?

Only the main process should have access to this. If you use "fork" for the start method, everything will be accessible to the child process, but as soon as something tries to modify it, it will be copied to it's own private memory space before being modified (copy on write). This reduces overhead if you have large read-only arrays, but doesn't help you much for writing data back to those arrays.

what is being sent if this variable is not defined on those processors.

A new array is created within the child process when the arguments are re-constructed after being sent from the main process via a pipe and pickle. The data is serialized to text and re-constructed, so no information other than the value of the data in the slice remains. it's a totally new object.

Why doesn't example 2 work while example 3 does?

example 3 works because at the time of "fork" (the moment you call Pool), arr has already been created, and will be shared. It's also important that you used an Array to create it, so when you attempt to modify the data, the data is shared (the exact mechanics of this are complicated).

example 2 does not work in a similar way that example 1 does not work: you pass a slice of an array as an argument, which gets converted into a totally new object, so arr inside your do_stuff function is just a copy of arr[i:i+1] from the main process. It is still important to create anything which will be shared between processes before calling Pool (if you're relying on "fork" to share the data), but that's not why this example doesn't work.

You should know: example 3 only works because you're on linux, and the default start method is fork. This is not the preferred start method due to the possibility of deadlocks with copying lock objects in a locked state. This will not work on Windows at all, and won't work on MacOS by default on 3.8 and above.

The best solution (most portable) to all this is to pass the Array itself as the argument, and re-construct the numpy array inside the child process. This has the complication that "shared objects" can only be passed as arguments at the creation of the child process. This isn't as big a deal if you use Process, but with Pool, you basically have to pass any shared objects as arguments to an initialization function, and get the re-constructed array as a global variable of the child's scope. In this example for instance you will get an error trying to pass buf as an argument with p.map or p.apply, but not when passing buf as initargs=(buf,) to Pool()

import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
global arr #use global context (for each process) to pass arr to do_stuff
arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
global arr
arr[i]=i

if __name__ == '__main__':
idx = [0,1,2,3]

buf = Array('d', 4)
arr = np.frombuffer(buf.get_obj(), dtype='d')
arr[:] = 0

#"with" context is easier than writing "close" and "join" all the time
with Pool(4, initializer=init_child, initargs=(buf,)) as p:
for i in idx:
p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
print(arr)

with 3.8 and above there's a new module which is better than Array or any of the other sharedctypes classes called: shared_memory. This is a bit more complicated to use, and has some additional OS dependent nastiness, but it's theoretically lower overhead and faster. If you want to go down the rabbit hole I've written a few answers on the topic of shared_memory, and have recently been answering lots of questions on concurrency in general if you want to take a gander at my answers from the last month or two.

Is it possible to share a numpy array that's not empty between processes?

Here's an example of how to use shared_memory using numpy. It was pasted together from several of my other answers, but there are a couple pitfalls to keep in mind with shared_memory:

  • When you create a numpy ndarray from a shm object, it doesn't prevent the shm from being garbage collected. The unfortunate side effect of this is that the next time you try to access the array, you get a segfault. From another question I created a quick ndarray subclass to just attach the shm as an attribute, so a reference sticks around, and it doesn't get GC'd.
  • Another pitfall is that on Windows, the OS does the tracking of when to delete the memory rather than giving you the access to do so. That means that even if you don't call unlink, the memory will get deleted if there are no active references to that particular segment of memory (given by the name). The way to solve this is to make sure you keep an shm open on the main process that outlives all child processes. Calling close and unlink at the end keeps that reference to the end, and makes sure on other platforms you don't leak memory.
import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory

class SHMArray(np.ndarray): #copied from https://numpy.org/doc/stable/user/basics.subclassing.html#slightly-more-realistic-example-attribute-added-to-existing-array
'''an ndarray subclass that holds on to a ref of shm so it doesn't get garbage collected too early.'''
def __new__(cls, input_array, shm=None):
obj = np.asarray(input_array).view(cls)
obj.shm = shm
return obj

def __array_finalize__(self, obj):
if obj is None: return
self.shm = getattr(obj, 'shm', None)

def child_func(name, shape, dtype):
shm = SharedMemory(name=name)
arr = SHMArray(np.ndarray(shape, buffer=shm.buf, dtype=dtype), shm)
arr[:] += 5
shm.close() #be sure to cleanup your shm's locally when they're not needed (referring to arr after this will segfault)

if __name__ == "__main__":
shape = (10,) # 1d array 10 elements long
dtype = 'f4' # 32 bit floats
dummy_array = np.ndarray(shape, dtype=dtype) #dumy array to calculate nbytes
shm = SharedMemory(create=True, size=dummy_array.nbytes)
arr = np.ndarray(shape, buffer=shm.buf, dtype=dtype) #create the real arr backed by the shm
arr[:] = 0
print(arr) #should print arr full of 0's
p1 = mp.Process(target=child_func, args=(shm.name, shape, dtype))
p1.start()
p1.join()
print(arr) #should print arr full of 5's
shm.close() #be sure to cleanup your shm's
shm.unlink() #call unlink when the actual memory can be deleted


Related Topics



Leave a reply



Submit