Dead Simple Example of Using Multiprocessing Queue, Pool and Locking

Dead simple example of using Multiprocessing Queue, Pool and Locking

The best solution for your problem is to utilize a Pool. Using Queues and having a separate "queue feeding" functionality is probably overkill.

Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool. I believe it's the easiest way to go, with minimal changes to original code:

import multiprocessing
import time

data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs

def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)

if __name__ == '__main__':
mp_handler()

Note that mp_worker() function now accepts a single argument (a tuple of the two previous arguments) because the map() function chunks up your input data into sublists, each sublist given as a single argument to your worker function.

Output:

Processs a  Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE

Edit as per @Thales comment below:

If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:

A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...

then change the handler function to launch pools (of 2 processes) for each pair of data:

def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))

Now your output is:

 Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE

How to use multiprocessing queue in Python?

My main problem is that I really don't know how to implement multiprocessing.queue correctly, you cannot really instantiate the object for each process since they will be separate queues, how do you make sure that all processes relate to a shared queue (or in this case, queues)

This is a simple example of a reader and writer sharing a single queue... The writer sends a bunch of integers to the reader; when the writer runs out of numbers, it sends 'DONE', which lets the reader know to break out of the read loop.

You can spawn as many reader processes as you like...

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get() # Read from the queue and do nothing
if msg == "DONE":
break

def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue. A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii) # Put 'count' numbers into queue

### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")

def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc

all_reader_procs.append(reader_p)

return all_reader_procs

if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue() # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

writer(count, len(all_reader_procs), qq) # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")

_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish

print(" reader_p() idx:%s is done" % idx)

print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")

How do I use multiprocessing Pool and Queue together?

Even though you store all your useful results in the queue output you want to fetch the results via calling output.get() the number of times it was stored in the output (number of training examples - len(c) in your case). For me it works if you change the line:

print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable

to:

print('storing output of calculations...')
p = pd.DataFrame([output.get() for _ in range(len(c))]) ## <-- no longer breaks

Can I use a multiprocessing Queue in a function called by Pool.imap?

The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.

import multiprocessing as mp

def f(x):
f.q.put('Doing: ' + str(x))
return x*x

def f_init(q):
f.q = q

def main():
jobs = range(1,6)

q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()

for i in range(len(jobs)):
print q.get()
print results.next()

if __name__ == '__main__':
main()

multiprocessing.Queue as arg to pool worker aborts execution of worker

Problem

When you call apply_async it returns a AsyncResult object and leaves the workload distribution to a separate thread (see also this answer). This thread encounters the problem that the Queue object can't be pickled and therefore the requested work can't be distributed (and eventually executed). We can see this by calling AsyncResult.get:

r = p.apply_async(f,args=(q,))
r.get()

which raises a RuntimeError:

RuntimeError: Queue objects should only be shared between processes through inheritance

However this RuntimeError is only raised in the main thread once you request the result because it actually occurred in a different thread (and thus needs a way to be transmitted).

So what happens when you do

p.apply_async(f,args=(q,))

is that the target function f is never invoked because one of it's arguments (q) can't be pickled. Therefore q never receives an item and remains empty and for that reason the call to q.get in the main thread blocks forever.

Solution

With apply_async you don't have to manage the result queues manually but they are readily provided to you in form of AsyncResult objects. So you can modify the code to simply return from the target function:

from multiprocessing import Queue, Pool

def f():
return [42, None, 'hello']

if __name__ == '__main__':
q = Queue()
p = Pool(1)
result = p.apply_async(f)
print(result.get())


Related Topics



Leave a reply



Submit