Multiprocessing: How to use Pool.map on a function defined in a class?
I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))
use multiprocess.Pool.map in a class
First let's have the printout be a bit more orderly by adding flush=True
to the print statement so that each print output occupies its own line:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
Prints:
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0
Analysis
Now let's analyze what is happening. The creation of a = Acc()
is done by the main process. The multiprocessing pool processes are executing is a different address space so when they execute your worker function, self.run
, object a
must be serialized/de-serialized to the address space of the process that will be executing the worker function. In that new address space self.count
comes across with the initial value of 0, which is printed, and then is incremented to 1 and returned. Meanwhile, in parallel, object a
is being serialized/de-serialized 3 more times so 3 other processes can do the same processing and they, too, will also print 0 and return the value 1. But since all this incrementing is occurring to the copies of a
that exist in address spaces other than the main process's address space, the original a
in the main process remains unmodified. So as the map
function continues to execute and a
is further copied from the main process to the processing pool, it is always with self.count = 0
.
Then the question becomes why is i = 1
instead of i = 0
sometimes being printed?
When you execute map
with an iterable specifying 30 elements as are doing here, by default these 30 tasks are divided into "chunks" based on the chunksize argument that you provide. Since we took the default chunksize=None, the map
function computes a default chunksize
value based on the length of the iterable and the pool size:
chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
chunksize += 1
In this the pool size was 4 and so the chunksize
would have been computed to be 2. That means that each process in the multiprocessing pool are taking tasks of the task queue two at a time and so they are processing the same object twice with different values of i
(which is ignored).
If we specify a chunksize of 1, so that each process only processes the object one at a time, then we have:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=1)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
Prints;
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0
And if we specify a chunksize of 30 so that a single process is processing all of the tasks against a single object:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count += i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
Prints:
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0
In this last case, of course, no multiprocessing occurred since a single process of the multiprocessing pool processed all the submitted tasks.
How to use multiprocessing pool.map with multiple arguments
The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap
method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with
, you'll also need to write a wrapper to turn Pool
into a context manager. (Thanks to muon for pointing this out.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
In simpler cases, with a fixed second argument, you can also use partial
, but only in Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.
class variable become empty in multiprocessing pool.map
The problem seems to be as follows:
The class data created in your main process must be serialized/de-serialized using pickle
so that it can be passed from the main process's address space to the address spaces of the processes in the multiprocessing pool that needs to work with these objects. But the class data in question is an instance of class Parent
since you are calling one of its methods, i.e. valuate_without_prefix
. But nowhere in that instance is there a reference to class Util
or anything that would cause the multiprocessing pool to be serializing the Util
class along with the Parent
instance. Consequently, when that method references class Util
in any of the processes, a new Util
will be created and, of course, it will not have its dictionary initialized.
I think the simplest change is to:
- Make attribute
_raw_data
an instance attribute rather than a class attribute (by the way, according to your current usage, there is no need for this to be adefaultdict
). - Create an instance of class
Util
namedutil
and initialize the dictionary via this reference. - Use the initializer and initargs arguments of the
multiprocessing.Pool
constructor to initialize each process in the multiprocessing pool to have a global variable namedutil
that will be a copy of theutil
instance created by the main process.
So I would organize the code along the following lines:
class Utils:
def __init__(self):
self._raw_data = {}
def raw_data(self):
# No need to make a copy ???
return self._raw_data.copy()
def set_raw_data(self, key, data):
self._raw_data[key] = data
def init_processes(utils_instance):
"""
Initialize each process in the process pool with global variable utils.
"""
global utils
utils = utils_instance
class Parent:
...
def evaluate_without_prefix(self, devices):
results = []
print(utils.raw_data())
for network1, network2 in itertools.product(utils.raw_data()[devices[0]], utils.raw_data()[devices[1]]):
results.append([network1, network2])
return results
class Child(Parent):
...
def execute(self, utils):
pool = Pool(os.cpu_count() - 1, initializer=init_processes, initargs=(utils,))
# No need to make an explicit list (map will do that for you) ???
devices = list(itertools.combinations(list(utils.raw_data().keys()), 2))
results = pool.map(super().evaluate_without_prefix, devices)
return results
def main():
utils = Utils()
# Initialize utils:
...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
utils.set_raw_data(device_name, data)
child = Child()
results = child.execute(utils)
if __name__ == '__main__':
main()
Further Explanation
The following program's main process calls class method Foo.set_x
to update class attribute x
to the value of 10 before creating a multiprocessing pool and invoking worker function worker
, which prints out the value of Foo.x
.
On Windows, which uses OS spawn
to create new processes, the process in the pool is initialized prior to calling the worker function essentially by launching a new Python interpreter and re-executing the source program executing every statement at global scope. Hence the class definition of Foo
is created by the Python interpreter compiling it; there is no pickling involved. But Foo.x
will be 0.
The same program run on Linux, which uses OS fork
to create new processes, inherits a copy-on-write address space from the main process. Therefore, it will have a copy of the Foo
class as it existed at the time the multiprocessing pool was created and Foo.x
will be 10.
My solution above, which uses a pool initializer to set a global variable in each pool's process's address space to the value of the Util
instance, is what is required for Windows platforms and will work also for Linux. An alternative, of course, is to pass the Util
instance as an additional argument to your worker function instead of using a pool initializer, but this is generally not as efficient because generally the number of processes in the pool is less than the number of times the worker function is being invoked so less pickling will be required with the pool initializer method.
from multiprocessing import Pool
class Foo:
x = 0
@classmethod
def set_x(cls, x):
cls.x = x
def worker():
print(Foo.x)
if __name__ == '__main__':
Foo.set_x(10)
pool = Pool(1)
pool.apply(worker)
How to call a function that is inside another function using pool multiprocessing?
AttributeError: 'function' object has no attribute 'map'
We need to instantiate Pool
from multiprocessing
and call map
method of that pool object.
You have to move inside
method to some class because Pool uses pickel to serialize and deserialize methods and if its inside some method then it cannot be imported by pickel
.
Pool needs to pickle (serialize) everything it sends to its
worker-processes (IPC). Pickling actually only saves the name of a
function and unpickling requires re-importing the function by name.
For that to work, the function needs to be defined at the top-level,
nested functions won't be importable by the child and already trying
to pickle them raises an exception (more).
Please visit this link of SO.
from multiprocessing import Pool
class Wrap:
def inside(self, a):
print(a)
def main():
pool = Pool()
pool.map(Wrap().inside, 'Ok' * 10)
if __name__ == '__main__':
main()
If you don't want to wrap inside
method inside of a class
move the inside
method to global scope so it can be pickled
from multiprocessing import Pool
def inside(a):
print(a)
def main():
with Pool() as pool:
pool.map(inside, 'Ok'*10)
if __name__ == '__main__':
main()
Multiprocessing: Passing a class instance to pool.map
At the gentle and patient prodding of martineau (thanks!) I think I've ironed out the problems. I have yet to apply it to my original code, but it is working in the example above and I'll start new questions for future implementation problems.
So in addition to changing where in the code the target file (the log, in this example) gets opened, I also started the QueueWriter instance as a single multiprocessing process rather than using pool.map
. As martineau pointed out the map call blocks until the qWriter.__call__()
returns and this prevented the workers from being called.
There were some other bugs in the code above, but those were incidental and fixed below:
import multiprocessing as mp
import os
import random
class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")
def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
log.write("This is where I'd write: {0} to grid file.\n".format(res))
log.close()
log = None
class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")
def __call__(self, idx):
res = self.workhorse(idx)
self.queue.put((idx,res))
return res
def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]
if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')
pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()
grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter
qWriter = QueueWriter(grid=grid, path=path)
# watcher = pool.map(qWriter, (q,),1)
# Start the writer as a single process rather than a pool
p = mp.Process(target=qWriter, args=(q,))
p.start()
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
# result.get()
# not required for pool
q.put('kill')
pool.close()
p.join()
pool.join()
Pool within a Class in Python
It looks like because of the way the function gets passed to the worker threads (pickling) you can't use instance methods unfortunately. My first thought was to use lambdas, but it turns out the built in pickler can't serialize those either. The solution, sadly, is just to use a function in the global namespace. As suggested in other answers, you can use static methods and pass self to make it look more like an instance method.
from multiprocessing import Pool
from itertools import repeat
class SeriesInstance(object):
def __init__(self):
self.numbers = [1,2,3]
def run(self):
p = Pool()
squares = p.map(self.F, self.numbers)
multiples = p.starmap(self.G, zip(repeat(self), [2, 5, 10]))
return (squares, multiples)
@staticmethod
def F(x):
return x * x
@staticmethod
def G(self, m):
return [m *n for n in self.numbers]
if __name__ == '__main__':
print(SeriesInstance().run())
use multiprocessing pool to call a method of a class which has parametrized constructor
An easy manner to do it would be to create a function that takes care of the instantiation as well as the call:
def mp_func(user):
a = A(user)
a.api_call()
and call that function in your pool.
If you want to really make it so that it works with a function from your class maybe you can use a classmethod:
class A(object):
def __init__(self, user):
self.user = user
def api_call(self):
print(self.user)
@classmethod
def mp_create_and_call(cls, user):
newcls = cls(user)
newcls.api_call()
import multiprocessing as mp
pool = mp.Pool(4)
usernames = ['a', 'b', 'c'] * 20
pool.map(A.mp_create_and_call, usernames)
How to properly reference to instances of a class in Multiprocessing Pool.map?
After studying the multiprocessing documentation, I understood the misinterpretation of the concept.
With multiprocessing, even if an instance of a class is passed as an argument, it makes sense that the ID is different from the one in the calling method, since now we are working in a different Process altogether, and therefore this object is a copy of the original object, and does not correspond to the same place in memory. Because of that, whatever changes made in the copy have no impact on its original instance.
In order to use parallellism and share states, a different concept must be applied, the one of multithreading, as available in the thread-based parallellism documentation. The difference between multithreading and multiprocessing has been thoroughly discussed here: Multiprocessing vs Threading Python
Returning to the original question, two easy ways could be achieved to loop through the List and apply the function:
1. Using the multiprocessing.dummy:
multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.
So the answer could be written as:
import multiprocessing.dummy as mp
p = mp.Pool(3) # With 3 being the number of threads.
p.map(Kill_Animal, AnimalsList)
p.close()
p.join()
[print(animal.isAlive) for animal in AnimalsList]
Output: False False False False False
2. Using a Queue:
from queue import Queue
from threading import Thread
# Creates the hunter thread.
def hunter():
while True:
animal = q.get()
Kill_Animal(animal)
q.task_done()
num_hunter_threads = 3
q = Queue()
#Initialize the threads
for i in range(num_hunter_threads):
t = Thread(target=hunter)
t.daemon = True
t.start()
#Adds each animal in the list to the Queue.
for animal in AnimalsList:
q.put(animal)
#Execute the jobs in the queue.
q.join()
[print(animal.isAlive) for animal in AnimalsList)
Output: False False False False False
Related Topics
How to Get Max() to Return Variable Names Instead of Values in Python
How to Code My Bot to Generate Random Images from One Command
How to Close an Internet Tab With Cmd/Python
Element That Appear More That Once in the List in Python
Heroku: No Default Language Could Be Detected for This App
Python: Filenotfounderror: [Winerror 3] the System Cannot Find the Path Specified: ''
Optimal Way to Store Data from Pandas to Snowflake
Get Absolute Paths of All Files in a Directory
Python/Pandas: Convert Month Int to Month Name
Render_Template in Python-Flask Is Not Working
Numpy: Checking If a Value Is Nat
Python Read File Determined by Separator \R\N
Calculate Final Letter Grade in Python Given 4 Test Scores
Pandas: Calculate the Percentage Between Two Rows and Add the Value as a Column
Calculate Rgb Value for a Range of Values to Create Heat Map
Flask API Typeerror: Object of Type 'Response' Is Not Json Serializable