Multiprocessing: How to Use Pool.Map on a Function Defined in a Class

Multiprocessing: How to use Pool.map on a function defined in a class?

I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun

def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))

use multiprocess.Pool.map in a class

First let's have the printout be a bit more orderly by adding flush=True to the print statement so that each print output occupies its own line:

from multiprocessing import Pool

class Acc:
def __init__(self):
self.count = 0

def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()

def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count

if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)

Prints:

i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0

Analysis

Now let's analyze what is happening. The creation of a = Acc() is done by the main process. The multiprocessing pool processes are executing is a different address space so when they execute your worker function, self.run, object a must be serialized/de-serialized to the address space of the process that will be executing the worker function. In that new address space self.count comes across with the initial value of 0, which is printed, and then is incremented to 1 and returned. Meanwhile, in parallel, object a is being serialized/de-serialized 3 more times so 3 other processes can do the same processing and they, too, will also print 0 and return the value 1. But since all this incrementing is occurring to the copies of a that exist in address spaces other than the main process's address space, the original a in the main process remains unmodified. So as the map function continues to execute and a is further copied from the main process to the processing pool, it is always with self.count = 0.

Then the question becomes why is i = 1 instead of i = 0 sometimes being printed?

When you execute map with an iterable specifying 30 elements as are doing here, by default these 30 tasks are divided into "chunks" based on the chunksize argument that you provide. Since we took the default chunksize=None, the map function computes a default chunksize value based on the length of the iterable and the pool size:

chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
chunksize += 1

In this the pool size was 4 and so the chunksize would have been computed to be 2. That means that each process in the multiprocessing pool are taking tasks of the task queue two at a time and so they are processing the same object twice with different values of i (which is ignored).

If we specify a chunksize of 1, so that each process only processes the object one at a time, then we have:

from multiprocessing import Pool

class Acc:
def __init__(self):
self.count = 0

def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=1)
pool.close()
pool.join()

def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count

if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)

Prints;

i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0

And if we specify a chunksize of 30 so that a single process is processing all of the tasks against a single object:

from multiprocessing import Pool

class Acc:
def __init__(self):
self.count = 0

def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=30)
pool.close()
pool.join()

def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count

if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)

Prints:

i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0

In this last case, of course, no multiprocessing occurred since a single process of the multiprocessing pool processed all the submitted tasks.

How to use multiprocessing pool.map with multiple arguments

The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:

import multiprocessing
from itertools import product

def merge_names(a, b):
return '{} & {}'.format(a, b)

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with, you'll also need to write a wrapper to turn Pool into a context manager. (Thanks to muon for pointing this out.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
return '{} & {}'.format(a, b)

def merge_names_unpack(args):
return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

In simpler cases, with a fixed second argument, you can also use partial, but only in Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()

def merge_names(a, b):
return '{} & {}'.format(a, b)

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.

class variable become empty in multiprocessing pool.map

The problem seems to be as follows:

The class data created in your main process must be serialized/de-serialized using pickle so that it can be passed from the main process's address space to the address spaces of the processes in the multiprocessing pool that needs to work with these objects. But the class data in question is an instance of class Parent since you are calling one of its methods, i.e. valuate_without_prefix. But nowhere in that instance is there a reference to class Util or anything that would cause the multiprocessing pool to be serializing the Util class along with the Parent instance. Consequently, when that method references class Util in any of the processes, a new Util will be created and, of course, it will not have its dictionary initialized.

I think the simplest change is to:

  1. Make attribute _raw_data an instance attribute rather than a class attribute (by the way, according to your current usage, there is no need for this to be a defaultdict).
  2. Create an instance of class Util named util and initialize the dictionary via this reference.
  3. Use the initializer and initargs arguments of the multiprocessing.Pool constructor to initialize each process in the multiprocessing pool to have a global variable named util that will be a copy of the util instance created by the main process.

So I would organize the code along the following lines:

class Utils:

def __init__(self):
self._raw_data = {}

def raw_data(self):
# No need to make a copy ???
return self._raw_data.copy()

def set_raw_data(self, key, data):
self._raw_data[key] = data

def init_processes(utils_instance):
"""
Initialize each process in the process pool with global variable utils.
"""
global utils
utils = utils_instance

class Parent:
...
def evaluate_without_prefix(self, devices):
results = []
print(utils.raw_data())
for network1, network2 in itertools.product(utils.raw_data()[devices[0]], utils.raw_data()[devices[1]]):
results.append([network1, network2])
return results

class Child(Parent):
...
def execute(self, utils):
pool = Pool(os.cpu_count() - 1, initializer=init_processes, initargs=(utils,))
# No need to make an explicit list (map will do that for you) ???
devices = list(itertools.combinations(list(utils.raw_data().keys()), 2))
results = pool.map(super().evaluate_without_prefix, devices)
return results

def main():
utils = Utils()
# Initialize utils:
...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
utils.set_raw_data(device_name, data)

child = Child()
results = child.execute(utils)

if __name__ == '__main__':
main()

Further Explanation

The following program's main process calls class method Foo.set_x to update class attribute x to the value of 10 before creating a multiprocessing pool and invoking worker function worker, which prints out the value of Foo.x.

On Windows, which uses OS spawn to create new processes, the process in the pool is initialized prior to calling the worker function essentially by launching a new Python interpreter and re-executing the source program executing every statement at global scope. Hence the class definition of Foo is created by the Python interpreter compiling it; there is no pickling involved. But Foo.x will be 0.

The same program run on Linux, which uses OS fork to create new processes, inherits a copy-on-write address space from the main process. Therefore, it will have a copy of the Foo class as it existed at the time the multiprocessing pool was created and Foo.x will be 10.

My solution above, which uses a pool initializer to set a global variable in each pool's process's address space to the value of the Util instance, is what is required for Windows platforms and will work also for Linux. An alternative, of course, is to pass the Util instance as an additional argument to your worker function instead of using a pool initializer, but this is generally not as efficient because generally the number of processes in the pool is less than the number of times the worker function is being invoked so less pickling will be required with the pool initializer method.

from multiprocessing import Pool

class Foo:
x = 0

@classmethod
def set_x(cls, x):
cls.x = x

def worker():
print(Foo.x)


if __name__ == '__main__':
Foo.set_x(10)
pool = Pool(1)
pool.apply(worker)

How to call a function that is inside another function using pool multiprocessing?

AttributeError: 'function' object has no attribute 'map'

We need to instantiate Pool from multiprocessing and call map method of that pool object.

You have to move inside method to some class because Pool uses pickel to serialize and deserialize methods and if its inside some method then it cannot be imported by pickel.

Pool needs to pickle (serialize) everything it sends to its
worker-processes (IPC). Pickling actually only saves the name of a
function and unpickling requires re-importing the function by name.
For that to work, the function needs to be defined at the top-level,
nested functions won't be importable by the child and already trying
to pickle them raises an exception (more).

Please visit this link of SO.

from multiprocessing import Pool

class Wrap:
def inside(self, a):
print(a)

def main():
pool = Pool()
pool.map(Wrap().inside, 'Ok' * 10)

if __name__ == '__main__':
main()

If you don't want to wrap inside method inside of a class move the inside method to global scope so it can be pickled

from multiprocessing import Pool

def inside(a):
print(a)

def main():
with Pool() as pool:
pool.map(inside, 'Ok'*10)

if __name__ == '__main__':
main()

Multiprocessing: Passing a class instance to pool.map

At the gentle and patient prodding of martineau (thanks!) I think I've ironed out the problems. I have yet to apply it to my original code, but it is working in the example above and I'll start new questions for future implementation problems.

So in addition to changing where in the code the target file (the log, in this example) gets opened, I also started the QueueWriter instance as a single multiprocessing process rather than using pool.map. As martineau pointed out the map call blocks until the qWriter.__call__() returns and this prevented the workers from being called.

There were some other bugs in the code above, but those were incidental and fixed below:

import multiprocessing as mp
import os
import random

class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")

def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
log.write("This is where I'd write: {0} to grid file.\n".format(res))

log.close()
log = None

class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")

def __call__(self, idx):
res = self.workhorse(idx)
self.queue.put((idx,res))
return res

def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')

pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()

grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter

qWriter = QueueWriter(grid=grid, path=path)
# watcher = pool.map(qWriter, (q,),1)
# Start the writer as a single process rather than a pool
p = mp.Process(target=qWriter, args=(q,))
p.start()
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
# result.get()
# not required for pool
q.put('kill')
pool.close()
p.join()
pool.join()

Pool within a Class in Python

It looks like because of the way the function gets passed to the worker threads (pickling) you can't use instance methods unfortunately. My first thought was to use lambdas, but it turns out the built in pickler can't serialize those either. The solution, sadly, is just to use a function in the global namespace. As suggested in other answers, you can use static methods and pass self to make it look more like an instance method.

from multiprocessing import Pool
from itertools import repeat

class SeriesInstance(object):
def __init__(self):
self.numbers = [1,2,3]

def run(self):
p = Pool()
squares = p.map(self.F, self.numbers)
multiples = p.starmap(self.G, zip(repeat(self), [2, 5, 10]))
return (squares, multiples)

@staticmethod
def F(x):
return x * x

@staticmethod
def G(self, m):
return [m *n for n in self.numbers]

if __name__ == '__main__':
print(SeriesInstance().run())

use multiprocessing pool to call a method of a class which has parametrized constructor

An easy manner to do it would be to create a function that takes care of the instantiation as well as the call:

def mp_func(user):
a = A(user)
a.api_call()

and call that function in your pool.

If you want to really make it so that it works with a function from your class maybe you can use a classmethod:

class A(object):
def __init__(self, user):
self.user = user

def api_call(self):
print(self.user)

@classmethod
def mp_create_and_call(cls, user):
newcls = cls(user)
newcls.api_call()

import multiprocessing as mp
pool = mp.Pool(4)
usernames = ['a', 'b', 'c'] * 20
pool.map(A.mp_create_and_call, usernames)

How to properly reference to instances of a class in Multiprocessing Pool.map?

After studying the multiprocessing documentation, I understood the misinterpretation of the concept.

With multiprocessing, even if an instance of a class is passed as an argument, it makes sense that the ID is different from the one in the calling method, since now we are working in a different Process altogether, and therefore this object is a copy of the original object, and does not correspond to the same place in memory. Because of that, whatever changes made in the copy have no impact on its original instance.

In order to use parallellism and share states, a different concept must be applied, the one of multithreading, as available in the thread-based parallellism documentation. The difference between multithreading and multiprocessing has been thoroughly discussed here: Multiprocessing vs Threading Python

Returning to the original question, two easy ways could be achieved to loop through the List and apply the function:

1. Using the multiprocessing.dummy:

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

So the answer could be written as:

import multiprocessing.dummy as mp
p = mp.Pool(3) # With 3 being the number of threads.
p.map(Kill_Animal, AnimalsList)
p.close()
p.join()

[print(animal.isAlive) for animal in AnimalsList]

Output: False False False False False

2. Using a Queue:

from queue import Queue
from threading import Thread

# Creates the hunter thread.
def hunter():
while True:
animal = q.get()
Kill_Animal(animal)
q.task_done()

num_hunter_threads = 3
q = Queue()

#Initialize the threads
for i in range(num_hunter_threads):
t = Thread(target=hunter)
t.daemon = True
t.start()

#Adds each animal in the list to the Queue.
for animal in AnimalsList:
q.put(animal)

#Execute the jobs in the queue.
q.join()

[print(animal.isAlive) for animal in AnimalsList)

Output: False False False False False



Related Topics



Leave a reply



Submit