Share Large, Read-Only Numpy Array Between Multiprocessing Processes

Share Large, Read-Only Numpy Array Between Multiprocessing Processes

@Velimir Mlaker gave a great answer. I thought I could add some bits of comments and a tiny example.

(I couldn't find much documentation on sharedmem - these are the results of my own experiments.)

  1. Do you need to pass the handles when the subprocess is starting, or after it has started? If it's just the former, you can just use the target and args arguments for Process. This is potentially better than using a global variable.
  2. From the discussion page you linked, it appears that support for 64-bit Linux was added to sharedmem a while back, so it could be a non-issue.
  3. I don't know about this one.
  4. No. Refer to example below.

Example

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
data[start] = 0;

def split_work(num):
n = 20
width = n/num
shared = sharedmem.empty(n)
shared[:] = numpy.random.rand(1, n)[0]
print "values are %s" % shared

processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
for p in processes:
p.start()
for p in processes:
p.join()

print "values are %s" % shared
print "type is %s" % type(shared[0])

if __name__ == '__main__':
split_work(4)

Output

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
0.44056863 0.10701816 0.67167752 0.29158274 0.22242552 0.14273156
0.34912309 0.43812636 0.58484507 0.81697513 0.57758441 0.4284959
0.7292129 0.06063283]
values are [ 0. 0.59667692 0.10761908 0.6736734 0.46349645 0.
0.44056863 0.10701816 0.67167752 0.29158274 0. 0.14273156
0.34912309 0.43812636 0.58484507 0. 0.57758441 0.4284959
0.7292129 0.06063283]
type is <type 'numpy.float64'>

This related question might be useful.

python3 multiprocess shared numpy array(read-only)

If you absolutely must use Python multiprocessing, then you can use Python multiprocessing along with Arrow's Plasma object store to store the object in shared memory and access it from each of the workers. See this example, which does the same thing using a Pandas dataframe instead of a numpy array.

If you don't absolutely need to use Python multiprocessing, you can do this much more easily with Ray. One advantage of Ray is that it will work out of the box not just with arrays but also with Python objects that contain arrays.

Under the hood, Ray serializes Python objects using Apache Arrow, which is a zero-copy data layout, and stores the result in Arrow's Plasma object store. This allows worker tasks to have read-only access to the objects without creating their own copies. You can read more about how this works.

Here is a modified version of your example that runs.

import numpy as np
import ray

ray.init()

@ray.remote
def worker_func(data, i):
# Do work. This function will have read-only access to
# the data array.
return 0

data = np.zeros(10**7)
# Store the large array in shared memory once so that it can be accessed
# by the worker tasks without creating copies.
data_id = ray.put(data)

# Run worker_func 10 times in parallel. This will not create any copies
# of the array. The tasks will run in separate processes.
result_ids = []
for i in range(10):
result_ids.append(worker_func.remote(data_id, i))

# Get the results.
results = ray.get(result_ids)

Note that if we omitted the line data_id = ray.put(data) and instead called worker_func.remote(data, i), then the data array would be stored in shared memory once per function call, which would be inefficient. By first calling ray.put, we can store the object in the object store a single time.

Sharing numpy array between processes collecting data (populating array) and parsing data (array operations) with both processes as class methods

Sharing the flags

You can use multiprocessing.Value to store the flags in shared memory. These would need to be created in the main process and shared to the child processes which store them. Furthermore, to access and change their value, you need to use the value parameter of these flags rather than comparing them directly with different objects.

Keep in mind that shared memory is not thread-safe, but multiprocessing.Value supports keyword argument lock=True during creation to enable use of locks internally. Sample change in your code:

class DataCaptureThread():
def __init__(self, capture_flag, outgoing_flag, duration,parameters):
.
.
self.capture_flag = capture_flag
self.outgoing_data_open_flag = outgoing_flag
.

def read_data(self, duration):
if self.capture_flag.value:
while True:
#Pretend this function gets entire dataset from socket and saves file
self.data_array = ReadSocket()
#Wait for other process to indicate it's ready to receive array
while not self.outgoing_data_open_flag.value:
continue

class SensorInterface():
.
.
.
def collect_data(duration):

self.capture_stream.duration = duration
self.capture_stream.capture_flag.value = True

class DataHandler():
def __init__(self, incoming_data_ready_flag, data_full):
#flag to indicate that sensor process is not ready to transfer data
self.incoming_data_ready_flag = incoming_data_ready_flag
#Flag to indicate that process is currently busy with data, cannot accept more
self.data_full = data_full
self.data_full.value = False
.

def run_processing(self):
while True:
#Wait until data is ready
while not self.incoming_data_ready_flag.value:
continue
#set flag to indicate process is busy with data
self.data_full.value = True
#Pretend this function encapsulates all data processing and saving
DataProcessing(self.data_array)
#Reset flag
self.data_full.value = False

if __name__ == '__main__':
import multiprocessing as mp
from ctypes import c_bool

# Create flags and set their initial value
outgoing_data_ready_flag = Value(c_bool, False)
data_full = Value(c_bool, False)
incoming_data_ready_flag = Value(c_bool, False)

#Create objects (remember to pass these flags to them as arguments!)
.
.
.

Sharing the array

As per communicating the content of the actual arrays, it is hard to suggest the best way since you have abstracted how you would get the data in your pseudocode. Normally, a Queue would be a good way to go about this, but you can also use a Pipe if there is going to be only one consumer and one producer writing data from one end. Pipes are also not thread-safe (unlike queues) but are generally faster than Queues. If you do decide to use them, they would also need to be passed to the child processes from the main process

Additionally, there are also shared memory blocks that you can create and store the array directly in using multiprocessing.shared_memory. Any child process could attach to this memory block and access the contents, making it much faster than queues and pipes with the tradeoff being that they are a little complicated to deal with, which I think you are trying to avoid.

Edit

Regarding the method to share arrays, its much more about how much flexibility you are ready to compromise for speed. For example, here are some questions you should ask yourself:

  1. How many different processes would need the contents of the array once its populated? If its more than one, then using Queues/Pipes would be expensive since you would need to create multiple of them for each process to ensure each one gets the required data.
  2. What is the content in the arrays? Remember that all data transferred from one process to another (like when using queues/pipes) needs to be pickled. So if the object being put in queue is fairly complex (or big), pickling and then unpickling would add extra overhead. On the flip side, shared memory usually is very restrictive about what it can and can't store (more about this below).
  3. How much time am I willing to spend on this? Like I said, queues and pipes are very straightforward implementations, you could do it in half an evening and call it a day. They are also very flexible, you can put almost anything picklable in there, making it easier on future you if the requirements for the code changes. Shared memory, on the other hand, comes with it's usual set of headaches and bottlenecks (thread-safety being both of them). If you don't want to go as low level as working with memory blocks, there is also multiprocessing.Array that works similar to mp.Value. But again it's restrictive in what it can store (I have never used this, so there may be some workarounds that I am not aware of, take it with a grain of salt).

Is it possible to share a numpy array that's not empty between processes?

Here's an example of how to use shared_memory using numpy. It was pasted together from several of my other answers, but there are a couple pitfalls to keep in mind with shared_memory:

  • When you create a numpy ndarray from a shm object, it doesn't prevent the shm from being garbage collected. The unfortunate side effect of this is that the next time you try to access the array, you get a segfault. From another question I created a quick ndarray subclass to just attach the shm as an attribute, so a reference sticks around, and it doesn't get GC'd.
  • Another pitfall is that on Windows, the OS does the tracking of when to delete the memory rather than giving you the access to do so. That means that even if you don't call unlink, the memory will get deleted if there are no active references to that particular segment of memory (given by the name). The way to solve this is to make sure you keep an shm open on the main process that outlives all child processes. Calling close and unlink at the end keeps that reference to the end, and makes sure on other platforms you don't leak memory.
import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory

class SHMArray(np.ndarray): #copied from https://numpy.org/doc/stable/user/basics.subclassing.html#slightly-more-realistic-example-attribute-added-to-existing-array
'''an ndarray subclass that holds on to a ref of shm so it doesn't get garbage collected too early.'''
def __new__(cls, input_array, shm=None):
obj = np.asarray(input_array).view(cls)
obj.shm = shm
return obj

def __array_finalize__(self, obj):
if obj is None: return
self.shm = getattr(obj, 'shm', None)

def child_func(name, shape, dtype):
shm = SharedMemory(name=name)
arr = SHMArray(np.ndarray(shape, buffer=shm.buf, dtype=dtype), shm)
arr[:] += 5
shm.close() #be sure to cleanup your shm's locally when they're not needed (referring to arr after this will segfault)

if __name__ == "__main__":
shape = (10,) # 1d array 10 elements long
dtype = 'f4' # 32 bit floats
dummy_array = np.ndarray(shape, dtype=dtype) #dumy array to calculate nbytes
shm = SharedMemory(create=True, size=dummy_array.nbytes)
arr = np.ndarray(shape, buffer=shm.buf, dtype=dtype) #create the real arr backed by the shm
arr[:] = 0
print(arr) #should print arr full of 0's
p1 = mp.Process(target=child_func, args=(shm.name, shape, dtype))
p1.start()
p1.join()
print(arr) #should print arr full of 5's
shm.close() #be sure to cleanup your shm's
shm.unlink() #call unlink when the actual memory can be deleted


Related Topics



Leave a reply



Submit