How to Use a Multiprocessing.Manager()

How to use a multiprocessing.Manager()?

Manager proxy objects are unable to propagate changes made to (unmanaged) mutable objects inside a container. So in other words, if you have a manager.list() object, any changes to the managed list itself are propagated to all the other processes. But if you have a normal Python list inside that list, any changes to the inner list are not propagated, because the manager has no way of detecting the change.

In order to propagate the changes, you have to use manager.list() objects for the nested lists too (requires Python 3.6 or newer), or you need to modify the manager.list() object directly (see the note on manager.list in Python 3.5 or older).

For example, consider the following code and its output:

import multiprocessing
import time

def f(ns, ls, di):
ns.x += 1
ns.y[0] += 1
ns_z = ns.z
ns_z[0] += 1
ns.z = ns_z

ls[0] += 1
ls[1][0] += 1 # unmanaged, not assigned back
ls_2 = ls[2] # unmanaged...
ls_2[0] += 1
ls[2] = ls_2 # ... but assigned back
ls[3][0] += 1 # managed, direct manipulation

di[0] += 1
di[1][0] += 1 # unmanaged, not assigned back
di_2 = di[2] # unmanaged...
di_2[0] += 1
di[2] = di_2 # ... but assigned back
di[3][0] += 1 # managed, direct manipulation

if __name__ == '__main__':
manager = multiprocessing.Manager()
ns = manager.Namespace()
ns.x = 1
ns.y = [1]
ns.z = [1]
ls = manager.list([1, [1], [1], manager.list([1])])
di = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})

print('before', ns, ls, ls[2], di, di[2], sep='\n')
p = multiprocessing.Process(target=f, args=(ns, ls, di))
p.start()
p.join()
print('after', ns, ls, ls[2], di, di[2], sep='\n')

Output:

before
Namespace(x=1, y=[1], z=[1])
[1, [1], [1], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[1]
{0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[1]
after
Namespace(x=2, y=[1], z=[2])
[2, [1], [2], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[2]
{0: 2, 1: [1], 2: [2], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[2]

As you can see, when a new value is assigned directly to the managed container, it changes; when it is assigned to a mutable container within the managed container, it doesn't change; but if the mutable container is then reassigned to the managed container, it changes again. Using a nested managed container also works, detecting changes directly without having to assign back to the parent container.

How to make use of a multiprocessing manager within a class

To answer your questions:

Q1 - Why is this happening?

Each worker process created by the Pool.map() needs to execute the instance method self.do_thing(). In order to do that Python pickles the instance and passes it to the subprocess (which unpickles it). If each instance has a Manager it will be a problem because they're not pickleable. Part of the unpickling process involves importing the module that defines the class and restoring the instance's attributes (which were also pickled).

Q2 - How to fix it

You can avoid the problem by having the class create its own class-level Manager (shared by all instances of the class). Here the __init__() method creates the manager class attribute the first time an instance is created and from that point on, further instances will reuse this — it's sometimes called "lazy initialization"

from multiprocessing import Pool, Manager
import random

class SomeClass:
def __init__(self):
# Lazy creation of class attribute.
try:
manager = getattr(type(self), 'manager')
except AttributeError:
manager = type(self).manager = Manager()
self.dct = manager.dict()

def __call__(self):
with Pool(2) as pool:
pool.map(self.do_thing, range(10))
print('done')

def do_thing(self, n):
for i in range(10_000_000):
i += 1
self.dct[n] = random.randint(0, 9)

if __name__ == '__main__':
inst = SomeClass()
inst()

Q3 - Is this a reasonable thing to do?

In my opinion, yes.

Python multiprocessing and Manager

Your code example seems to have bigger problems than form. You get your desired output only with luck. Repeated execution will yield different results. That's because += is not an atomic operation. Multiple processes can read the same old value one after another, before any of them has updated it and they will write back the same values. To prevent this behaviour, you'll have to use a Manager.Lock additionally.


To your original question about "good form".

IMO it would be cleaner, to let the main-function of the child process foo_parallel, pass global_dict explicitly into a generic function add(var). That would be a form of dependency injection and has some advantages. In your example non-exhaustively:

  • allows isolated testing

  • increases code reusability

  • easier debugging (detecting non-accessibility of the managed object shouldn't be delayed until addis called (fail fast)

  • less boilerplate code (for example try-excepts blocks on resources multiple functions need)

As a side note. Using list comprehensions only for it's side effects is considered a 'code smell'. If you don't need a list as result, just use a for-loop.

Code:

import os
from multiprocessing import Process, Manager

def add(l):
l += [l[-1] + 1]
return l

def foo_parallel(global_dict, lock):
with lock:
l = global_dict['a']
global_dict['a'] = add(l)
print(os.getpid(), global_dict)

if __name__ == '__main__':

N_WORKERS = 5

with Manager() as manager:

lock = manager.Lock()
global_dict = manager.dict(a=[0])

pool = [Process(target=foo_parallel, args=(global_dict, lock))
for _ in range(N_WORKERS)]

for p in pool:
p.start()

for p in pool:
p.join()

print('result', global_dict)

How do read and writes work with a manager in Python?

It depends on how you write to the dictionary, i.e. whether the operation is atomic or not:

my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic

So creating a new key and updating a an existing key as in the first line of code above are atomic operations. But the second line of code are really multiple operations equivalent to:

temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp

So if two processes were executing my_dict[some_key] += 1 in parallel, they could be reading the same value of temp = my_dict[some_key] and incrementing temp to the same new value and the net effect would be that the dictionary value only gets incremented once. This can be demonstrated as follows:

from multiprocessing import Pool, Manager, Lock

def init_pool(the_lock):
global lock
lock = the_lock

def worker1(d):
for _ in range(1000):
with lock:
d['x'] += 1

def worker2(d):
for _ in range(1000):
d['y'] += 1

if __name__ == '__main__':
lock = Lock()
with Manager() as manager, \
Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
d = manager.dict()
d['x'] = 0
d['y'] = 0
# worker1 will serialize with a lock
pool.apply_async(worker1, args=(d,))
pool.apply_async(worker1, args=(d,))
# worker2 will not serialize with a lock:
pool.apply_async(worker2, args=(d,))
pool.apply_async(worker2, args=(d,))
# wait for the 4 tasks to complete:
pool.close()
pool.join()
print(d)

Prints:

{'x': 2000, 'y': 1162}

Update

As far as serialization, goes:

The BaseManager creates a server using by default a socket for Linux and a named pipe for Windows. So essentially every method you execute against a managed dictionary, for example, is pretty much like a remote method call implemented with message passing. This also means that the server could also be running on a different computer altogether. But, these method calls are not serialized; the object methods themselves must be thread-safe because each method call is run in a new thread.

The following is an example of creating our own managed type and having the server listening for requests possibly from a different computer (although in this example, the client is running on the same computer). The client is calling increment on the managed object 1000 times across two threads, but the method implementation is not done under a lock and so the resulting value of self.x when we are all done is not 1000. Also, when we retrieve the value of x twice concurrently by method get_x we see that both invocations start up more-or-less at the same time:

from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time

class MathManager(BaseManager):
pass

class MathClass:
def __init__(self, x=0):
self.x = x

def increment(self, y):
temp = self.x
time.sleep(.01)
self.x = temp + 1

def get_x(self):
print(f'get_x started by thread {get_ident()}', time.time())
time.sleep(2)
return self.x

def set_x(self, value):
self.x = value

def server(event1, event2):
MathManager.register('Math', MathClass)
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.start()
event1.set() # show we are started
print('Math server running; waiting for shutdown...')
event2.wait() # wait for shutdown
print("Math server shutting down.")
manager.shutdown()

def client():
MathManager.register('Math')
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.connect()
math = manager.Math()
pool = ThreadPool(2)
pool.map(math.increment, [1] * 1000)
results = [pool.apply_async(math.get_x) for _ in range(2)]
for result in results:
print(result.get())

def main():
event1 = Event()
event2 = Event()
t = Thread(target=server, args=(event1, event2))
t.start()
event1.wait() # server started
client() # now we can run client
event2.set()
t.join()

# Required for Windows:
if __name__ == '__main__':
main()

Prints:

Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.

How to use multiprocessing.Manager().Value to store a sum?

The multiprocessing documentation (under multiprocessing.Value) is quite explicit about this:

Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do counter.value += 1.

In short, you need to grab a lock to be able to do this.

You can do that with:

def add_to_value(addend, value, lock):
with lock:
value.value += addend

if __name__ == '__main__':
with multiprocessing.Manager() as manager:
lock = manager.Lock()
value = manager.Value(float, 0.0)
with multiprocessing.Pool(2) as pool:
pool.starmap(add_to_value,
[(float(i), value, lock) for i in range(100)])
print(value.value)

This will correctly output 4950.0.

But note that this approach will be quite expensive due to the need for locking. Most probably, it will take more time to finish than if you have a single process doing the operation.

NOTE: I'm also adding an if __name__ == '__main__': guard which is actually required when using a start method other than fork. The default on both Windows and Mac OS is spawn, so that's really needed to make this code portable to either of those platforms. Start methods spawn and forkserver are also available on Linux/Unix, so in some situations this is also needed there.

Multiprocessing will be more efficient when you're able to offload a job to workers that they can complete on their own, for example calculate partial sums and then add them together in the main process. If possible, consider rethinking your approach to fit that model.

Using the Manager (for Pool) in the function for multiprocessing (Windows 10)

Yes, it looks like creating Manager as a global causes problems on Windows. Move it to the module main and pass the Namespace as a parameter.
Pool.map() allows passing only one argument to a worker, so put multiple arguments, including the namespace, into a list. Pass a list of argument lists to Pool.map().

I may be wrong but I don't think you should expect/require the object ids not to change.

from multiprocessing import Pool, Manager

import numpy as np

def func(a):
"""This is a function that we want our processes to call."""
(config, i) = a
# You can modify the Namespace object from anywhere.
config.z = i
print('config is', config)
# And they will still be shared (i.e. same id).
print('id(config) = {:d}'.format(id(config)))

# This main func
def main(config):
"""The main function contain multiprocess.Pool codes."""
# You can add to the Namespace object too.
config.d = 10
config.a = 5.25e6
pool = Pool(1)
pool.map(func, list([config, i] for i in range(20,25)))
pool.close()
pool.join()

if __name__ == "__main__":
# Create manager object in module-level namespace
mgr = Manager()
# Then create a container of things that you want to share to
# processes as Manager.Namespace() object.
config = mgr.Namespace()
# The Namespace object can take various data type
config.a = 1
config.b = '2'
config.c = [1, 2, 3, 4]

# Let's print the config
print(config)
# Now executing main()
main(config)
# Again, you can add or modify the Namesapce object from anywhere.
config.e = np.round(np.random.rand(2,2), 2)
config.f = range(-3, 3)
print(config)

python multiprocessing manager.list inside manager.dict

In short, you are abusing the thread-safe dict by attempting to store non-serializable queue (which is thread-safe on its own). The best way to go about it is to use independent variables to store the collections created with manager:

d = manager.dict()
q = manager.Queue()

then q.put("Starting") works. You need to pass it directly to the function or method that is going to be executed in a separate process, e.g.:

def f(d,q):
d['a'] = 1
q.put('a')

p = Process(target=f, args=(d,q,))

multiprocessing.manager is supposed to be used as the provider of thread-safe collections that can be reused by threads and processes. The caveat is that objects originating from the manager cannot hold other objects that were created using it.

I recommend a read of the documentation of multiprocessing module which is very friendly and has a lot of good examples to start with.

Problems using multiprocessing.manager

Send your args as a tuple:

import multiprocessing
def mde(dad):
for i in range(100):
for j in range(10):
dad[0] = i
dad[1] = j
def mda(dad):
c = 0
while c < 1001:
print(dad)
c += 1

if __name__ == '__main__':
manager = multiprocessing.Manager()
dado = manager.list([0, 0])
print(dado)
p1 = multiprocessing.Process(target=mde, args=(dado,))
p2 = multiprocessing.Process(target=mda, args=(dado,))
p1.start()
p2.start()
p1.join()
p2.join()

How to start multiprocessing network manager with multiple queues in separate process

There are a few subtleties. First, when you register with a manager a function that returns an object, the manager by default will attempt to build a proxy for that object. But the object you are returning is a managed queue that is already a proxied object. You should therefore just be returning an ordinary queue instance as in the second example in Using a Remote Manager.

The following code can be started with argument server to start up the remote manager, workers to start up a process pool of 3 processes where each process reads from the work_tasks_queue expecting an integer and writes a tuple to the task_done_queue queue consisting of the integer and the square of the integer as the result or with no argument to start the client that writes 10 integers to the work_tasks_queue queue and then reads the 10 results from the task_done_queue queue, which may be in arbitrary order.

There seems to be a bug with authentication and it becomes necessary for each process in the process pool to initialize its own process as follows or the manager will reject requests:

current_process().authkey = password.encode('utf-8')

Needless to say, the server, workers and client would typically (or at least, possibly) be run on 3 different machines (with an adjustment to the address specification).

Common QueueManager.py Module

from multiprocessing.managers import BaseManager

address = "127.0.0.1"
port = 50000
password = "secret"

class QueueManager(BaseManager):
pass

def connect_to_manager():
QueueManager.register('work_tasks_queue')
QueueManager.register('done_task_queue')
manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
manager.connect()
return manager.work_tasks_queue(), manager.done_task_queue()

server.py

from QueueManager import *
from queue import Queue

work_tasks_queue = Queue()
done_task_queue = Queue()

def get_work_tasks_queue():
return work_tasks_queue

def get_done_task_queue():
return done_task_queue

def server():
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)

net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))

net_manager.start()
input('Server starting. Hit Enter to terminate....')
net_manager.shutdown()

if __name__ == '__main__':
server()

workers.py

from QueueManager import *
from multiprocessing import Process, current_process, cpu_count
from threading import Thread

def worker(in_q, out_q):
current_process().authkey = password.encode('utf-8')
while True:
x = in_q.get()
if x is None: # signal to terminate
in_q.task_done()
break
out_q.put((x, x ** 2))
in_q.task_done()

def create_workers(in_q, out_q, n_workers):
processes = [Process(target=worker, args=(in_q, out_q)) for _ in range(n_workers)]
for process in processes:
process.start()
for process in processes:
process.join()

def start_workers():
N_WORKERS = cpu_count()
in_q, out_q = connect_to_manager()
t = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS))
t.start()
input('Starting workers. Hit enter to terminate...')
for _ in range(N_WORKERS):
in_q.put(None) # tell worker to quit
#in_q.join() # not strictly necessary; assumes client's work has been completed too
t.join()

if __name__ == '__main__':
start_workers()

client.py

from QueueManager import *

def client():
in_q, out_q = connect_to_manager()
for x in range(1, 10):
in_q.put(x)
# get results as they become available:
for x in range(1, 10):
x, result = out_q.get()
print(x, result)

if __name__ == '__main__':
client()

Prints:

1 1
4 16
3 9
2 4
5 25
6 36
8 64
7 49
9 81

Update

Here is code to run everything all together.

from QueueManager import *
from workers import create_workers
from client import client
from queue import Queue
from threading import Thread, Event

# So that queues are not unnecessarily created by worker processes under Windows:
work_tasks_queue = None
done_task_queue = None

def get_work_tasks_queue():
global work_tasks_queue
# singleton:
if work_tasks_queue is None:
work_tasks_queue = Queue()
return work_tasks_queue

def get_done_task_queue():
global done_task_queue
# singleton:
if done_task_queue is None:
done_task_queue = Queue()
return done_task_queue

def server(started_event, shutdown_event):
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)

net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))

net_manager.start()
started_event.set() # tell main thread that we have started
shutdown_event.wait() # wait to be told to shutdown
net_manager.shutdown()

if __name__ == '__main__':
started_event = Event()
shutdown_event = Event()
server_thread = Thread(target=server, args=(started_event, shutdown_event,))
server_thread.start()
# wait for manager to start:
started_event.wait()
in_q, out_q = connect_to_manager()
N_WORKERS = 3
workers_thread = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS,))
workers_thread.start()
client()
# tell workers we are through:
for _ in range(N_WORKERS):
in_q.put(None)
#in_q.join() # not strictly necessary; assumes client's work has been completed too
workers_thread.join()
# tell manager we are through:
shutdown_event.set()
server_thread.join()


Related Topics



Leave a reply



Submit