How to Share Numpy Random State of a Parent Process with Child Processes

How to share numpy random state of a parent process with child processes?

Even if you manage to get this working, I don’t think it will do what you want. As soon as you have multiple processes pulling from the same random state in parallel, it’s no longer deterministic which order they each get to the state, meaning your runs won’t actually be repeatable. There are probably ways around that, but it seems like a nontrivial problem.

Meanwhile, there is a solution that should solve both the problem you want and the nondeterminism problem:

Before spawning a child process, ask the RNG for a random number, and pass it to the child. The child can then seed with that number. Each child will then have a different random sequence from other children, but the same random sequence that the same child got if you rerun the entire app with a fixed seed.

If your main process does any other RNG work that could depend non-deterministically on the execution of the children, you'll need to pre-generate the seeds for all of your child processes, in order, before pulling any other random numbers.


As senderle pointed out in a comment: If you don't need multiple distinct runs, but just one fixed run, you don't even really need to pull a seed from your seeded RNG; just use a counter starting at 1 and increment it for each new process, and use that as a seed. I don't know if that's acceptable, but if it is, it's hard to get simpler than that.

As Amir pointed out in a comment: a better way is to draw a random integer every time you spawn a new process and pass that random integer to the new process to set the numpy's random seed with that integer. This integer can indeed come from np.random.randint().

How to propagate random seed to child processes

One way (I think the only practical way) of solving this problem is to come up with a managed random number generator class that you can pass to your worker function as an argument (the option chosen here) or used to initialize each process in the pool as a global variable. I have modified your code slightly so that instead of printing the random number, function do_thing returns the value and I have also modified the main process to create a pool size of 8 and to invoke do_thing 8 times. Finally, to ensure that all 8 processors each process one submitted task (I have 8 cores) instead of the first process processing all 8 tasks, which is a possibility when the job submitted completes very quickly, I have added a call to sleep to do_thing:

from multiprocessing import Pool, current_process
from multiprocessing.managers import BaseManager
import random
from functools import partial

class RandomGeneratorManager(BaseManager):
pass

class RandomGenerator:
def __init__(self):
random.seed(0)

def get_random(self):
return random.randint(0, 99)

def do_thing(random_generator, upper):
import time
time.sleep(.2)
print(current_process())
return random_generator.get_random()

# Required for Windows:
if __name__ == '__main__':
RandomGeneratorManager.register('RandomGenerator', RandomGenerator)
with RandomGeneratorManager() as manager:
random_generator = manager.RandomGenerator()
# random_generator will be the first argument to do_thing:
worker = partial(do_thing, random_generator)
with Pool(8) as pool:
print(pool.map(worker, [0] * 8))
with Pool(8) as pool:
print(pool.map(worker, [0] * 8))

Prints:

<SpawnProcess name='SpawnPoolWorker-3' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-2' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-4' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-5' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-7' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-6' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-9' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-8' parent=23956 started daemon>
[49, 97, 53, 5, 33, 65, 51, 62]
<SpawnProcess name='SpawnPoolWorker-14' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-10' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-13' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-11' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-17' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-15' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-16' parent=23956 started daemon>
<SpawnProcess name='SpawnPoolWorker-12' parent=23956 started daemon>
[38, 61, 45, 74, 27, 64, 17, 36]

Python Multiprocessing Numpy Random

Calling g(core) and permuting a copy of the array returns 4 identically 'shuffled' arrays. This seems to indicate that the working copy is not local the child process.

What it likely indicates is that the random number generator is initialized identically in each child process, producing the same sequence. You need to seed each child's generator (perhaps throwing the child's process id into the mix).

Making my NumPy array shared across processes

Note that you can start out with an array of complex dtype:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

and view it as an array of homogenous dtype:

In [5]: data2 = data.view('float32')

and later, convert it back to complex dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

Changing the dtype is a very quick operation; it does not affect the underlying data, only the way NumPy interprets it. So changing the dtype is virtually costless.

So what you've read about arrays with simple (homogenous) dtypes can be readily applied to your complex dtype with the trick above.


The code below borrows many ideas from J.F. Sebastian's answer, here.

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64

def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff) / 4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1

def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())

def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.

The m/z values are assumed to be ordered without validating this
assumption.

Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size = int(len(peaks) / processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)

if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)

If you can guarantee that the various processes which execute the assignments

if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)

never compete to alter the data in the same locations, then I believe you can actually forgo using the lock

with shared_arr.get_lock():

but I don't grok your code well enough to know for sure, so to be on the safe side, I included the lock.

Same output in different workers in multiprocessing

I think you'll need to re-seed the random number generator using numpy.random.seed in your do_calculation function.

My guess is that the random number generator (RNG) gets seeded when you import the module. Then, when you use multiprocessing, you fork the current process with the RNG already seeded -- Thus, all your processes are sharing the same seed value for the RNG and so they'll generate the same sequences of numbers.

e.g.:

def do_calculation(data):
np.random.seed()
rand=np.random.randint(10)
print data, rand
return data * 2

python multiprocessing sharing data between separate python processes

You are on the (or a) right track with this.

In a comment, stovfl suggests looking at the remote manager section of the Python multiprocessing Manager documentation (Python2, Python3). As you have observed, each manager has a name-able entity (a socket in /tmp in this case) through which each Python process can connect to a peer Python process. Because these are accessible from any process, however, they each have an access key.

The default key for each Manager is the one for the "main process", and it is a string of 32 random bytes:

class _MainProcess(BaseProcess):

def __init__(self):
self._identity = ()
self._name = 'MainProcess'
self._parent_pid = None
self._popen = None
self._config = {'authkey': AuthenticationString(os.urandom(32)),
'semprefix': '/mp'}
# Note that some versions of FreeBSD only allow named
# semaphores to have names of up to 14 characters. Therefore
# we choose a short prefix.
#
# On MacOSX in a sandbox it may be necessary to use a
# different prefix -- see #19478.
#
# Everything in self._config will be inherited by descendant
# processes.

but you may assign your own key, which you can then know and therefore use from anywhere else.

There are other ways to handle this. For instance, you can use XML RPC to export callable functions from one Python process, callable from anything—not just Python—that can speak XML RPC. See the Python2 or Python3 documentation. Heed this warning (this is the py3k variant but it applies in py2k as well):

Warning: The xmlrpc.client module is not secure against maliciously constructed data. If you need to parse untrusted or unauthenticated data see XML vulnerabilities.

Do not, however, assume that using a multiprocessing.Manager instead of XML RPC secures you against maliciously constructed data. Those are just as vulnerable since they will unpickle arbitrary data. See Attacking Python's pickle for more about this.

Seeding random number generators in parallel programs

If no seed is provided explicitly, numpy.random will seed itself using an OS-dependent source of randomness. Usually it will use /dev/urandom on Unix-based systems (or some Windows equivalent), but if this is not available for some reason then it will seed itself from the wall clock. Since self-seeding occurs at the time when a new subprocess forks, it is possible for multiple subprocesses to inherit the same seed if they forked at the same time, leading to identical random variates being produced by different subprocesses.

Often this correlates with the number of concurrent threads you are running. For example:

import numpy as np
import random
from multiprocessing import Pool

def Foo_np(seed=None):
# np.random.seed(seed)
return np.random.uniform(0, 1, 5)

pool = Pool(processes=8)
print np.array(pool.map(Foo_np, xrange(20)))

# [[ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.64672339 0.99851749 0.8873984 0.42734339 0.67158796]
# [ 0.64672339 0.99851749 0.8873984 0.42734339 0.67158796]
# [ 0.64672339 0.99851749 0.8873984 0.42734339 0.67158796]
# [ 0.64672339 0.99851749 0.8873984 0.42734339 0.67158796]
# [ 0.64672339 0.99851749 0.8873984 0.42734339 0.67158796]
# [ 0.11283279 0.28180632 0.28365286 0.51190168 0.62864241]
# [ 0.11283279 0.28180632 0.28365286 0.51190168 0.62864241]
# [ 0.28917586 0.40997875 0.06308188 0.71512199 0.47386047]
# [ 0.11283279 0.28180632 0.28365286 0.51190168 0.62864241]
# [ 0.64672339 0.99851749 0.8873984 0.42734339 0.67158796]
# [ 0.11283279 0.28180632 0.28365286 0.51190168 0.62864241]
# [ 0.14463001 0.80273208 0.5559258 0.55629762 0.78814652] <-
# [ 0.11283279 0.28180632 0.28365286 0.51190168 0.62864241]]

You can see that groups of up to 8 threads simultaneously forked with the same seed, giving me identical random sequences (I've marked the first group with arrows).

Calling np.random.seed() within a subprocess forces the thread-local RNG instance to seed itself again from /dev/urandom or the wall clock, which will (probably) prevent you from seeing identical output from multiple subprocesses. Best practice is to explicitly pass a different seed (or numpy.random.RandomState instance) to each subprocess, e.g.:

def Foo_np(seed=None):
local_state = np.random.RandomState(seed)
print local_state.uniform(0, 1, 5)

pool.map(Foo_np, range(20))

I'm not entirely sure what underlies the differences between random and numpy.random in this respect (perhaps it has slightly different rules for selecting a source of randomness to self-seed with compared to numpy.random?). I would still recommend explicitly passing a seed or a random.Random instance to each subprocess to be on the safe side. You could also use the .jumpahead() method of random.Random which is designed for shuffling the states of Random instances in multithreaded programs.



Related Topics



Leave a reply



Submit