Shared-Memory Objects in Multiprocessing

Shared-memory objects in multiprocessing

If you use an operating system that uses copy-on-write fork() semantics (like any common unix), then as long as you never alter your data structure it will be available to all child processes without taking up additional memory. You will not have to do anything special (except make absolutely sure you don't alter the object).

The most efficient thing you can do for your problem would be to pack your array into an efficient array structure (using numpy or array), place that in shared memory, wrap it with multiprocessing.Array, and pass that to your functions. This answer shows how to do that.

If you want a writeable shared object, then you will need to wrap it with some kind of synchronization or locking. multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).

The Manager approach can be used with arbitrary Python objects, but will be slower than the equivalent using shared memory because the objects need to be serialized/deserialized and sent between processes.

There are a wealth of parallel processing libraries and approaches available in Python. multiprocessing is an excellent and well rounded library, but if you have special needs perhaps one of the other approaches may be better.

Sharing array of objects with Python multiprocessing

So, I did a bit of research (Shared Memory Objects in Multiprocessing) and came up with a few ideas:

Pass numpy array of bytes

Serialize the objects, then save them as byte strings to a numpy array. Problematic here is that

  1. One one needs to pass the data type from the creator of 'psm_test0' to any consumer of 'psm_test0'. This could be done with another shared memory though.

  2. pickle and unpickle is essentailly like deepcopy, i.e. it actually copies the underlying data.

The code for the 'main' process reads:

import pickle
from multiprocessing import shared_memory
import numpy as np

# a simplistic class example
class A():
def __init__(self, x):
self.x = x

def pickle(self):
return pickle.dumps(self)

@classmethod
def unpickle(self, bts):
return pickle.loads(bts)

if __name__ == '__main__':
# Test pickling procedure
a = A(1)
print(A.unpickle(a.pickle()).x)
# >>> 1

# numpy array of byte strings
a_arr = np.array([A(1).pickle(), A(2).pickle(), A('This is a really long test string which should exceed 42 bytes').pickle()])

# create a shared memory instance
shm = shared_memory.SharedMemory(
create=True,
size=a_arr.nbytes,
name='psm_test0'
)

# numpy array backed by shared memory
b_arr = np.ndarray(a_arr.shape, dtype=a_arr.dtype, buffer=shm.buf)

# copy the original data into shared memory
b_arr[:] = a_arr[:]

print(b_arr.dtype)
# 'S105'

and for the consumer

import numpy as np
from multiprocessing import shared_memory
from test import A

# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')

c = np.ndarray((3,), dtype='S105', buffer=existing_shm.buf)

# Test data transfer
arr = [a.x for a in list(map(A.unpickle, c))]
print(arr)
# [1, 2, ...]

I'd say you have a few ways of going forward:

  1. Stay with simple data types.

  2. Implement something using the C api, but I can't really help you there.

  3. Use Rust

  4. Use Mangers. You maybe loose out on some performance (I'd like to see a real benchmark though), but You can get a relatively safe and simple interface for shared objects.

  5. Use Redis, which also has Python bindings...

How to use shared memory instead of passing objects via pickling between multiple processes

Use files!

No, really, use files -- they are are efficient (OS will cache the content), and allow you to work on much larger problems (data set doesn't have to fit into RAM).

Use any of https://docs.scipy.org/doc/numpy-1.15.0/reference/routines.io.html to dump/load numpy arrays to/from files and only pass file names between the processes.

P.S. benchmark serialisation methods, depending on the intermediate array size, the fastest could be "raw" (no conversion overhead) or "compressed" (if file ends up being written to disk) or something else. IIRC loading "raw" files may require knowing data format (dimensions, sizes) in advance.

python shared memory with pool

This works. I don't have a clear understanding of all the facts yet, though.

1- The shared memory object is declared:

shm = shared_memory.SharedMemory(create=True, size=10000000*4).

2- A (numpy array in this case) object is declared with buffer as follows:

b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf).

3- The numpy array is populated by copying data into it.

b[:] = np.random.randint(100, size=10000000, dtype=np.int32).

Then, all the function to be executed in many cpus needs is the name of the shared memory object and the mentioned step 2 inside the function is mapping the shared memory, which has been populated earlier.

It's essential that you close the shared object after accessing it and at the end unlink.

import numpy as np
from multiprocessing import shared_memory, Pool
import os

def test_function(args):
Input, shm_name, size = args
existing_shm = shared_memory.SharedMemory(name=shm_name)
d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
#print(Input, d[Input-1:Input+2])
d[Input]=-20
#print(Input, d[Input-1:Input+2])
existing_shm.close()
print(Input, 'parent process:', os.getppid())
print(Input, 'process id:', os.getpid())

if __name__=='__main__':

shm = shared_memory.SharedMemory(create=True, size=10000000*4)
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)

inputs =[[ 1,shm.name,b.shape],
[ 2,shm.name,b.shape],
[ 3,shm.name,b.shape],
[ 4,shm.name,b.shape],
[ 5,shm.name,b.shape],
[ 6,shm.name,b.shape],
[ 7,shm.name,b.shape],
[ 8,shm.name,b.shape],
[ 9,shm.name,b.shape],
[ 10,shm.name,b.shape],
[ 11,shm.name,b.shape],
[ 12,shm.name,b.shape],
[13,shm.name,b.shape]]

with Pool(os.cpu_count()) as p:
p.map(test_function, inputs)

print(b[:20])

# Clean up from within the first Python shell
shm.close()
shm.unlink() # Free and release the shared memory block at the very end

Passing shared memory variables in python multiprocessing

When I define arr = np.zeros(4), which processor owns this variable?

Only the main process should have access to this. If you use "fork" for the start method, everything will be accessible to the child process, but as soon as something tries to modify it, it will be copied to it's own private memory space before being modified (copy on write). This reduces overhead if you have large read-only arrays, but doesn't help you much for writing data back to those arrays.

what is being sent if this variable is not defined on those processors.

A new array is created within the child process when the arguments are re-constructed after being sent from the main process via a pipe and pickle. The data is serialized to text and re-constructed, so no information other than the value of the data in the slice remains. it's a totally new object.

Why doesn't example 2 work while example 3 does?

example 3 works because at the time of "fork" (the moment you call Pool), arr has already been created, and will be shared. It's also important that you used an Array to create it, so when you attempt to modify the data, the data is shared (the exact mechanics of this are complicated).

example 2 does not work in a similar way that example 1 does not work: you pass a slice of an array as an argument, which gets converted into a totally new object, so arr inside your do_stuff function is just a copy of arr[i:i+1] from the main process. It is still important to create anything which will be shared between processes before calling Pool (if you're relying on "fork" to share the data), but that's not why this example doesn't work.

You should know: example 3 only works because you're on linux, and the default start method is fork. This is not the preferred start method due to the possibility of deadlocks with copying lock objects in a locked state. This will not work on Windows at all, and won't work on MacOS by default on 3.8 and above.

The best solution (most portable) to all this is to pass the Array itself as the argument, and re-construct the numpy array inside the child process. This has the complication that "shared objects" can only be passed as arguments at the creation of the child process. This isn't as big a deal if you use Process, but with Pool, you basically have to pass any shared objects as arguments to an initialization function, and get the re-constructed array as a global variable of the child's scope. In this example for instance you will get an error trying to pass buf as an argument with p.map or p.apply, but not when passing buf as initargs=(buf,) to Pool()

import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
global arr #use global context (for each process) to pass arr to do_stuff
arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
global arr
arr[i]=i

if __name__ == '__main__':
idx = [0,1,2,3]

buf = Array('d', 4)
arr = np.frombuffer(buf.get_obj(), dtype='d')
arr[:] = 0

#"with" context is easier than writing "close" and "join" all the time
with Pool(4, initializer=init_child, initargs=(buf,)) as p:
for i in idx:
p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
print(arr)

with 3.8 and above there's a new module which is better than Array or any of the other sharedctypes classes called: shared_memory. This is a bit more complicated to use, and has some additional OS dependent nastiness, but it's theoretically lower overhead and faster. If you want to go down the rabbit hole I've written a few answers on the topic of shared_memory, and have recently been answering lots of questions on concurrency in general if you want to take a gander at my answers from the last month or two.



Related Topics



Leave a reply



Submit