How to use a multiprocessing.Manager()?
Manager proxy objects are unable to propagate changes made to (unmanaged) mutable objects inside a container. So in other words, if you have a manager.list()
object, any changes to the managed list itself are propagated to all the other processes. But if you have a normal Python list inside that list, any changes to the inner list are not propagated, because the manager has no way of detecting the change.
In order to propagate the changes, you have to use manager.list()
objects for the nested lists too (requires Python 3.6 or newer), or you need to modify the manager.list()
object directly (see the note on manager.list
in Python 3.5 or older).
For example, consider the following code and its output:
import multiprocessing
import time
def f(ns, ls, di):
ns.x += 1
ns.y[0] += 1
ns_z = ns.z
ns_z[0] += 1
ns.z = ns_z
ls[0] += 1
ls[1][0] += 1 # unmanaged, not assigned back
ls_2 = ls[2] # unmanaged...
ls_2[0] += 1
ls[2] = ls_2 # ... but assigned back
ls[3][0] += 1 # managed, direct manipulation
di[0] += 1
di[1][0] += 1 # unmanaged, not assigned back
di_2 = di[2] # unmanaged...
di_2[0] += 1
di[2] = di_2 # ... but assigned back
di[3][0] += 1 # managed, direct manipulation
if __name__ == '__main__':
manager = multiprocessing.Manager()
ns = manager.Namespace()
ns.x = 1
ns.y = [1]
ns.z = [1]
ls = manager.list([1, [1], [1], manager.list([1])])
di = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})
print('before', ns, ls, ls[2], di, di[2], sep='\n')
p = multiprocessing.Process(target=f, args=(ns, ls, di))
p.start()
p.join()
print('after', ns, ls, ls[2], di, di[2], sep='\n')
Output:
before
Namespace(x=1, y=[1], z=[1])
[1, [1], [1], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[1]
{0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[1]
after
Namespace(x=2, y=[1], z=[2])
[2, [1], [2], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[2]
{0: 2, 1: [1], 2: [2], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[2]
As you can see, when a new value is assigned directly to the managed container, it changes; when it is assigned to a mutable container within the managed container, it doesn't change; but if the mutable container is then reassigned to the managed container, it changes again. Using a nested managed container also works, detecting changes directly without having to assign back to the parent container.
How to make use of a multiprocessing manager within a class
To answer your questions:
Q1 - Why is this happening?
Each worker process created by the Pool.map()
needs to execute the instance method self.do_thing()
. In order to do that Python pickles the instance and passes it to the subprocess (which unpickles it). If each instance has a Manager
it will be a problem because they're not pickleable. Part of the unpickling process involves importing the module that defines the class and restoring the instance's attributes (which were also pickled).
Q2 - How to fix it
You can avoid the problem by having the class create its own class-level Manager
(shared by all instances of the class). Here the __init__()
method creates the manager
class attribute the first time an instance is created and from that point on, further instances will reuse this — it's sometimes called "lazy initialization"
from multiprocessing import Pool, Manager
import random
class SomeClass:
def __init__(self):
# Lazy creation of class attribute.
try:
manager = getattr(type(self), 'manager')
except AttributeError:
manager = type(self).manager = Manager()
self.dct = manager.dict()
def __call__(self):
with Pool(2) as pool:
pool.map(self.do_thing, range(10))
print('done')
def do_thing(self, n):
for i in range(10_000_000):
i += 1
self.dct[n] = random.randint(0, 9)
if __name__ == '__main__':
inst = SomeClass()
inst()
Q3 - Is this a reasonable thing to do?
In my opinion, yes.
Python multiprocessing and Manager
Your code example seems to have bigger problems than form. You get your desired output only with luck. Repeated execution will yield different results. That's because +=
is not an atomic operation. Multiple processes can read the same old value one after another, before any of them has updated it and they will write back the same values. To prevent this behaviour, you'll have to use a Manager.Lock
additionally.
To your original question about "good form".
IMO it would be cleaner, to let the main-function of the child process foo_parallel
, pass global_dict
explicitly into a generic function add(var)
. That would be a form of dependency injection and has some advantages. In your example non-exhaustively:
allows isolated testing
increases code reusability
easier debugging (detecting non-accessibility of the managed object shouldn't be delayed until
add
is called (fail fast)less boilerplate code (for example try-excepts blocks on resources multiple functions need)
As a side note. Using list comprehensions only for it's side effects is considered a 'code smell'. If you don't need a list as result, just use a for-loop.
Code:
import os
from multiprocessing import Process, Manager
def add(l):
l += [l[-1] + 1]
return l
def foo_parallel(global_dict, lock):
with lock:
l = global_dict['a']
global_dict['a'] = add(l)
print(os.getpid(), global_dict)
if __name__ == '__main__':
N_WORKERS = 5
with Manager() as manager:
lock = manager.Lock()
global_dict = manager.dict(a=[0])
pool = [Process(target=foo_parallel, args=(global_dict, lock))
for _ in range(N_WORKERS)]
for p in pool:
p.start()
for p in pool:
p.join()
print('result', global_dict)
How do read and writes work with a manager in Python?
It depends on how you write to the dictionary, i.e. whether the operation is atomic or not:
my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic
So creating a new key and updating a an existing key as in the first line of code above are atomic operations. But the second line of code are really multiple operations equivalent to:
temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp
So if two processes were executing my_dict[some_key] += 1
in parallel, they could be reading the same value of temp = my_dict[some_key]
and incrementing temp
to the same new value and the net effect would be that the dictionary value only gets incremented once. This can be demonstrated as follows:
from multiprocessing import Pool, Manager, Lock
def init_pool(the_lock):
global lock
lock = the_lock
def worker1(d):
for _ in range(1000):
with lock:
d['x'] += 1
def worker2(d):
for _ in range(1000):
d['y'] += 1
if __name__ == '__main__':
lock = Lock()
with Manager() as manager, \
Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
d = manager.dict()
d['x'] = 0
d['y'] = 0
# worker1 will serialize with a lock
pool.apply_async(worker1, args=(d,))
pool.apply_async(worker1, args=(d,))
# worker2 will not serialize with a lock:
pool.apply_async(worker2, args=(d,))
pool.apply_async(worker2, args=(d,))
# wait for the 4 tasks to complete:
pool.close()
pool.join()
print(d)
Prints:
{'x': 2000, 'y': 1162}
Update
As far as serialization, goes:
The BaseManager
creates a server using by default a socket for Linux and a named pipe for Windows. So essentially every method you execute against a managed dictionary, for example, is pretty much like a remote method call implemented with message passing. This also means that the server could also be running on a different computer altogether. But, these method calls are not serialized; the object methods themselves must be thread-safe because each method call is run in a new thread.
The following is an example of creating our own managed type and having the server listening for requests possibly from a different computer (although in this example, the client is running on the same computer). The client is calling increment
on the managed object 1000 times across two threads, but the method implementation is not done under a lock and so the resulting value of self.x
when we are all done is not 1000. Also, when we retrieve the value of x
twice concurrently by method get_x
we see that both invocations start up more-or-less at the same time:
from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time
class MathManager(BaseManager):
pass
class MathClass:
def __init__(self, x=0):
self.x = x
def increment(self, y):
temp = self.x
time.sleep(.01)
self.x = temp + 1
def get_x(self):
print(f'get_x started by thread {get_ident()}', time.time())
time.sleep(2)
return self.x
def set_x(self, value):
self.x = value
def server(event1, event2):
MathManager.register('Math', MathClass)
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.start()
event1.set() # show we are started
print('Math server running; waiting for shutdown...')
event2.wait() # wait for shutdown
print("Math server shutting down.")
manager.shutdown()
def client():
MathManager.register('Math')
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.connect()
math = manager.Math()
pool = ThreadPool(2)
pool.map(math.increment, [1] * 1000)
results = [pool.apply_async(math.get_x) for _ in range(2)]
for result in results:
print(result.get())
def main():
event1 = Event()
event2 = Event()
t = Thread(target=server, args=(event1, event2))
t.start()
event1.wait() # server started
client() # now we can run client
event2.set()
t.join()
# Required for Windows:
if __name__ == '__main__':
main()
Prints:
Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.
How to use multiprocessing.Manager().Value to store a sum?
The multiprocessing documentation (under multiprocessing.Value
) is quite explicit about this:
Operations like
+=
which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just docounter.value += 1
.
In short, you need to grab a lock to be able to do this.
You can do that with:
def add_to_value(addend, value, lock):
with lock:
value.value += addend
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
lock = manager.Lock()
value = manager.Value(float, 0.0)
with multiprocessing.Pool(2) as pool:
pool.starmap(add_to_value,
[(float(i), value, lock) for i in range(100)])
print(value.value)
This will correctly output 4950.0.
But note that this approach will be quite expensive due to the need for locking. Most probably, it will take more time to finish than if you have a single process doing the operation.
NOTE: I'm also adding an if __name__ == '__main__':
guard which is actually required when using a start method other than fork. The default on both Windows and Mac OS is spawn, so that's really needed to make this code portable to either of those platforms. Start methods spawn and forkserver are also available on Linux/Unix, so in some situations this is also needed there.
Multiprocessing will be more efficient when you're able to offload a job to workers that they can complete on their own, for example calculate partial sums and then add them together in the main process. If possible, consider rethinking your approach to fit that model.
Using the Manager (for Pool) in the function for multiprocessing (Windows 10)
Yes, it looks like creating Manager as a global causes problems on Windows. Move it to the module main and pass the Namespace as a parameter.
Pool.map() allows passing only one argument to a worker, so put multiple arguments, including the namespace, into a list. Pass a list of argument lists to Pool.map().
I may be wrong but I don't think you should expect/require the object ids not to change.
from multiprocessing import Pool, Manager
import numpy as np
def func(a):
"""This is a function that we want our processes to call."""
(config, i) = a
# You can modify the Namespace object from anywhere.
config.z = i
print('config is', config)
# And they will still be shared (i.e. same id).
print('id(config) = {:d}'.format(id(config)))
# This main func
def main(config):
"""The main function contain multiprocess.Pool codes."""
# You can add to the Namespace object too.
config.d = 10
config.a = 5.25e6
pool = Pool(1)
pool.map(func, list([config, i] for i in range(20,25)))
pool.close()
pool.join()
if __name__ == "__main__":
# Create manager object in module-level namespace
mgr = Manager()
# Then create a container of things that you want to share to
# processes as Manager.Namespace() object.
config = mgr.Namespace()
# The Namespace object can take various data type
config.a = 1
config.b = '2'
config.c = [1, 2, 3, 4]
# Let's print the config
print(config)
# Now executing main()
main(config)
# Again, you can add or modify the Namesapce object from anywhere.
config.e = np.round(np.random.rand(2,2), 2)
config.f = range(-3, 3)
print(config)
python multiprocessing manager.list inside manager.dict
In short, you are abusing the thread-safe dict by attempting to store non-serializable queue (which is thread-safe on its own). The best way to go about it is to use independent variables to store the collections created with manager
:
d = manager.dict()
q = manager.Queue()
then q.put("Starting")
works. You need to pass it directly to the function or method that is going to be executed in a separate process, e.g.:
def f(d,q):
d['a'] = 1
q.put('a')
p = Process(target=f, args=(d,q,))
multiprocessing.manager
is supposed to be used as the provider of thread-safe collections that can be reused by threads and processes. The caveat is that objects originating from the manager
cannot hold other objects that were created using it.
I recommend a read of the documentation of multiprocessing
module which is very friendly and has a lot of good examples to start with.
Problems using multiprocessing.manager
Send your args as a tuple:
import multiprocessing
def mde(dad):
for i in range(100):
for j in range(10):
dad[0] = i
dad[1] = j
def mda(dad):
c = 0
while c < 1001:
print(dad)
c += 1
if __name__ == '__main__':
manager = multiprocessing.Manager()
dado = manager.list([0, 0])
print(dado)
p1 = multiprocessing.Process(target=mde, args=(dado,))
p2 = multiprocessing.Process(target=mda, args=(dado,))
p1.start()
p2.start()
p1.join()
p2.join()
How to start multiprocessing network manager with multiple queues in separate process
There are a few subtleties. First, when you register with a manager a function that returns an object, the manager by default will attempt to build a proxy for that object. But the object you are returning is a managed queue that is already a proxied object. You should therefore just be returning an ordinary queue instance as in the second example in Using a Remote Manager.
The following code can be started with argument server to start up the remote manager, workers to start up a process pool of 3 processes where each process reads from the work_tasks_queue
expecting an integer and writes a tuple to the task_done_queue
queue consisting of the integer and the square of the integer as the result or with no argument to start the client that writes 10 integers to the work_tasks_queue
queue and then reads the 10 results from the task_done_queue
queue, which may be in arbitrary order.
There seems to be a bug with authentication and it becomes necessary for each process in the process pool to initialize its own process as follows or the manager will reject requests:
current_process().authkey = password.encode('utf-8')
Needless to say, the server, workers and client would typically (or at least, possibly) be run on 3 different machines (with an adjustment to the address
specification).
Common QueueManager.py Module
from multiprocessing.managers import BaseManager
address = "127.0.0.1"
port = 50000
password = "secret"
class QueueManager(BaseManager):
pass
def connect_to_manager():
QueueManager.register('work_tasks_queue')
QueueManager.register('done_task_queue')
manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
manager.connect()
return manager.work_tasks_queue(), manager.done_task_queue()
server.py
from QueueManager import *
from queue import Queue
work_tasks_queue = Queue()
done_task_queue = Queue()
def get_work_tasks_queue():
return work_tasks_queue
def get_done_task_queue():
return done_task_queue
def server():
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
input('Server starting. Hit Enter to terminate....')
net_manager.shutdown()
if __name__ == '__main__':
server()
workers.py
from QueueManager import *
from multiprocessing import Process, current_process, cpu_count
from threading import Thread
def worker(in_q, out_q):
current_process().authkey = password.encode('utf-8')
while True:
x = in_q.get()
if x is None: # signal to terminate
in_q.task_done()
break
out_q.put((x, x ** 2))
in_q.task_done()
def create_workers(in_q, out_q, n_workers):
processes = [Process(target=worker, args=(in_q, out_q)) for _ in range(n_workers)]
for process in processes:
process.start()
for process in processes:
process.join()
def start_workers():
N_WORKERS = cpu_count()
in_q, out_q = connect_to_manager()
t = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS))
t.start()
input('Starting workers. Hit enter to terminate...')
for _ in range(N_WORKERS):
in_q.put(None) # tell worker to quit
#in_q.join() # not strictly necessary; assumes client's work has been completed too
t.join()
if __name__ == '__main__':
start_workers()
client.py
from QueueManager import *
def client():
in_q, out_q = connect_to_manager()
for x in range(1, 10):
in_q.put(x)
# get results as they become available:
for x in range(1, 10):
x, result = out_q.get()
print(x, result)
if __name__ == '__main__':
client()
Prints:
1 1
4 16
3 9
2 4
5 25
6 36
8 64
7 49
9 81
Update
Here is code to run everything all together.
from QueueManager import *
from workers import create_workers
from client import client
from queue import Queue
from threading import Thread, Event
# So that queues are not unnecessarily created by worker processes under Windows:
work_tasks_queue = None
done_task_queue = None
def get_work_tasks_queue():
global work_tasks_queue
# singleton:
if work_tasks_queue is None:
work_tasks_queue = Queue()
return work_tasks_queue
def get_done_task_queue():
global done_task_queue
# singleton:
if done_task_queue is None:
done_task_queue = Queue()
return done_task_queue
def server(started_event, shutdown_event):
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
started_event.set() # tell main thread that we have started
shutdown_event.wait() # wait to be told to shutdown
net_manager.shutdown()
if __name__ == '__main__':
started_event = Event()
shutdown_event = Event()
server_thread = Thread(target=server, args=(started_event, shutdown_event,))
server_thread.start()
# wait for manager to start:
started_event.wait()
in_q, out_q = connect_to_manager()
N_WORKERS = 3
workers_thread = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS,))
workers_thread.start()
client()
# tell workers we are through:
for _ in range(N_WORKERS):
in_q.put(None)
#in_q.join() # not strictly necessary; assumes client's work has been completed too
workers_thread.join()
# tell manager we are through:
shutdown_event.set()
server_thread.join()
Related Topics
How to Pass Extra Arguments to a Python Decorator
Serialize Python Dictionary to Xml
Unnamed Python Objects Have the Same Id
List() Uses Slightly More Memory Than List Comprehension
Easy Pretty Printing of Floats
What Is the Advantage of a List Comprehension Over a for Loop
Modules Are Installed Using Pip on Osx But Not Found When Importing
Selecting from Multi-Index Pandas
What Is the Most Pythonic Way to Check If an Object Is a Number
Moving X-Axis to the Top of a Plot in Matplotlib
Formatting Long Numbers as Strings in Python
Multiprocessing in Python - Sharing Large Object (E.G. Pandas Dataframe) Between Multiple Processes
List of Tables, Db Schema, Dump etc Using the Python SQLite3 API
How to Convert an Array of Strings to an Array of Floats in Numpy
What's the Difference Between _Builtin_ and _Builtins_
Programmatically Searching Google in Python Using Custom Search