Can I use a multiprocessing Queue in a function called by Pool.imap?
The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.
import multiprocessing as mp
def f(x):
f.q.put('Doing: ' + str(x))
return x*x
def f_init(q):
f.q = q
def main():
jobs = range(1,6)
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for i in range(len(jobs)):
print q.get()
print results.next()
if __name__ == '__main__':
main()
Multiprocessing pool and queues
multiprocessing.Pool
will not accept a multiprocessing.Queue
as an argument in its work queue. I believe this is because it internally uses queues to send data back and forth to the worker processes. There are a couple workarounds:
1) Do you really need to use a queue? One advantage of the Pool
function is that their return values are sent back to the main processes. It is generally better to iterate over the return values from a pool than to use a separate queue. This also avoids the race condition introduce by checking queue.empty()
2) If you must use a Queue
, you can use one from multiprocessing.Manager
. This is a proxy to a shared queue which can be passed as an argument to the Pool
functions.
3) You can pass a normal Queue
to worker processes by using an initializer when creating the Pool
(like https://stackoverflow.com/a/3843313). This is kinda hacky.
The race condition I mentioned above comes from:
while not out_queue.empty():
print "queue: ", out_queue.get()
When you have worker processes filling your queue, you can have the condition where your queue is currently empty because a worker is about to put something into it. If you check .empty()
at this time you will end early. A better method is to put sentinal values in your queue to signal when you are finished putting data into it.
Show the progress of a Python multiprocessing pool imap_unordered call?
There is no need to access private attributes of the result set:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
Multiprocessing: How to use Pool.map on a function defined in a class?
I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))
How to use multiprocessing pool.map with multiple arguments
The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap
method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with
, you'll also need to write a wrapper to turn Pool
into a context manager. (Thanks to muon for pointing this out.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
In simpler cases, with a fixed second argument, you can also use partial
, but only in Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.
Can I pass queue object in multiprocessing pool starmap method
try using Manager()
like this:
from multiprocessing import Manager, Pool
def get_data(pageNo, q):
q.put(pageNo * pageNo)
if __name__ == "__main__":
m = Manager()
q = m.Queue()
p = {}
no_pages = 5
pool_tuple = [(x, q) for x in range(1, no_pages)]
with Pool(processes=3) as pool:
pool.starmap(get_data, pool_tuple)
for i in range(1, no_pages):
print("result", i, ":", q.get())
Output:
result 1 : 1
result 2 : 4
result 3 : 9
result 4 : 16
multiprocessing apply_async strange behaviour with multiprocessing queue
You need to create a queue instance that can be shared among different processes. You can do this by using multiprocessing.Manager object.
The following code seems to work:
import time
import multiprocessing
def multi_thread(files):
m = multiprocessing.Manager()
q = m.Queue()
for f in files:
q.put(f)
p = multiprocessing.Pool(5)
for i in range(5):
p.apply_async(worker_test, args=(i, q))
p.close()
p.join()
def worker_test(i, q):
print 'hello'
print i
def main():
files = ['a', 'b', 'c', 'd']
multi_thread(files[0:4])
if __name__ == '__main__':
main()
Related Topics
Efficient Way to Remove Keys with Empty Strings from a Dict
What Does "Error: Option --Single-Version-Externally-Managed Not Recognized" Indicate
How to Use If/Else in a Dictionary Comprehension
Anaconda/Conda - Install a Specific Package Version
Groupby Column and Filter Rows with Maximum Value in Pyspark
Is There a Difference Between Using a Dict Literal and a Dict Constructor
Pandas: Valueerror: Cannot Convert Float Nan to Integer
How to Merge Images into a Canvas Using Pil/Pillow
How to Change My Desktop Background with Python
Exponentials in Python: X**Y VS Math.Pow(X, Y)
Why Is the Time Complexity of Python's List.Append() Method O(1)
Pip Install Gives Error: Unable to Find Vcvarsall.Bat
How to Use Python Numpy.Savetxt to Write Strings and Float Number to an Ascii File
Good or Bad Practice in Python: Import in the Middle of a File
Why Is the Exit Window Button Work But the Exit Button in the Game Does Not Work