How to Use a Multiprocessing Queue in a Function Called by Pool.Imap

Can I use a multiprocessing Queue in a function called by Pool.imap?

The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.

import multiprocessing as mp

def f(x):
f.q.put('Doing: ' + str(x))
return x*x

def f_init(q):
f.q = q

def main():
jobs = range(1,6)

q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()

for i in range(len(jobs)):
print q.get()
print results.next()

if __name__ == '__main__':
main()

Multiprocessing pool and queues

multiprocessing.Pool will not accept a multiprocessing.Queue as an argument in its work queue. I believe this is because it internally uses queues to send data back and forth to the worker processes. There are a couple workarounds:

1) Do you really need to use a queue? One advantage of the Pool function is that their return values are sent back to the main processes. It is generally better to iterate over the return values from a pool than to use a separate queue. This also avoids the race condition introduce by checking queue.empty()

2) If you must use a Queue, you can use one from multiprocessing.Manager. This is a proxy to a shared queue which can be passed as an argument to the Pool functions.

3) You can pass a normal Queue to worker processes by using an initializer when creating the Pool(like https://stackoverflow.com/a/3843313). This is kinda hacky.

The race condition I mentioned above comes from:

while not out_queue.empty():
print "queue: ", out_queue.get()

When you have worker processes filling your queue, you can have the condition where your queue is currently empty because a worker is about to put something into it. If you check .empty() at this time you will end early. A better method is to put sentinal values in your queue to signal when you are finished putting data into it.

Show the progress of a Python multiprocessing pool imap_unordered call?

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

Multiprocessing: How to use Pool.map on a function defined in a class?

I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun

def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))

How to use multiprocessing pool.map with multiple arguments

The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:

import multiprocessing
from itertools import product

def merge_names(a, b):
return '{} & {}'.format(a, b)

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with, you'll also need to write a wrapper to turn Pool into a context manager. (Thanks to muon for pointing this out.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
return '{} & {}'.format(a, b)

def merge_names_unpack(args):
return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

In simpler cases, with a fixed second argument, you can also use partial, but only in Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()

def merge_names(a, b):
return '{} & {}'.format(a, b)

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.

Can I pass queue object in multiprocessing pool starmap method

try using Manager() like this:

from multiprocessing import Manager, Pool

def get_data(pageNo, q):
q.put(pageNo * pageNo)

if __name__ == "__main__":
m = Manager()
q = m.Queue()
p = {}
no_pages = 5
pool_tuple = [(x, q) for x in range(1, no_pages)]
with Pool(processes=3) as pool:
pool.starmap(get_data, pool_tuple)
for i in range(1, no_pages):
print("result", i, ":", q.get())

Output:

result 1 : 1
result 2 : 4
result 3 : 9
result 4 : 16

multiprocessing apply_async strange behaviour with multiprocessing queue

You need to create a queue instance that can be shared among different processes. You can do this by using multiprocessing.Manager object.

The following code seems to work:

import time
import multiprocessing

def multi_thread(files):
m = multiprocessing.Manager()
q = m.Queue()

for f in files:
q.put(f)

p = multiprocessing.Pool(5)
for i in range(5):
p.apply_async(worker_test, args=(i, q))

p.close()
p.join()

def worker_test(i, q):
print 'hello'
print i

def main():
files = ['a', 'b', 'c', 'd']

multi_thread(files[0:4])

if __name__ == '__main__':
main()


Related Topics



Leave a reply



Submit