Dead simple example of using Multiprocessing Queue, Pool and Locking
The best solution for your problem is to utilize a Pool
. Using Queue
s and having a separate "queue feeding" functionality is probably overkill.
Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool
. I believe it's the easiest way to go, with minimal changes to original code:
import multiprocessing
import time
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)
if __name__ == '__main__':
mp_handler()
Note that mp_worker()
function now accepts a single argument (a tuple of the two previous arguments) because the map()
function chunks up your input data into sublists, each sublist given as a single argument to your worker function.
Output:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
Edit as per @Thales comment below:
If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:
A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...
then change the handler function to launch pools (of 2 processes) for each pair of data:
def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))
Now your output is:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
How to use multiprocessing queue in Python?
My main problem is that I really don't know how to implement multiprocessing.queue correctly, you cannot really instantiate the object for each process since they will be separate queues, how do you make sure that all processes relate to a shared queue (or in this case, queues)
This is a simple example of a reader and writer sharing a single queue... The writer sends a bunch of integers to the reader; when the writer runs out of numbers, it sends 'DONE', which lets the reader know to break out of the read loop.
You can spawn as many reader processes as you like...
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get() # Read from the queue and do nothing
if msg == "DONE":
break
def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue. A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii) # Put 'count' numbers into queue
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")
def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc
all_reader_procs.append(reader_p)
return all_reader_procs
if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue() # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)
writer(count, len(all_reader_procs), qq) # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")
_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")
How do I use multiprocessing Pool and Queue together?
Even though you store all your useful results in the queue output
you want to fetch the results via calling output.get()
the number of times it was stored in the output
(number of training examples - len(c)
in your case). For me it works if you change the line:
print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
to:
print('storing output of calculations...')
p = pd.DataFrame([output.get() for _ in range(len(c))]) ## <-- no longer breaks
Can I use a multiprocessing Queue in a function called by Pool.imap?
The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.
import multiprocessing as mp
def f(x):
f.q.put('Doing: ' + str(x))
return x*x
def f_init(q):
f.q = q
def main():
jobs = range(1,6)
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for i in range(len(jobs)):
print q.get()
print results.next()
if __name__ == '__main__':
main()
multiprocessing.Queue as arg to pool worker aborts execution of worker
Problem
When you call apply_async
it returns a AsyncResult
object and leaves the workload distribution to a separate thread (see also this answer). This thread encounters the problem that the Queue
object can't be pickled and therefore the requested work can't be distributed (and eventually executed). We can see this by calling AsyncResult.get
:
r = p.apply_async(f,args=(q,))
r.get()
which raises a RuntimeError
:
RuntimeError: Queue objects should only be shared between processes through inheritance
However this RuntimeError
is only raised in the main thread once you request the result because it actually occurred in a different thread (and thus needs a way to be transmitted).
So what happens when you do
p.apply_async(f,args=(q,))
is that the target function f
is never invoked because one of it's arguments (q
) can't be pickled. Therefore q
never receives an item and remains empty and for that reason the call to q.get
in the main thread blocks forever.
Solution
With apply_async
you don't have to manage the result queues manually but they are readily provided to you in form of AsyncResult
objects. So you can modify the code to simply return from the target function:
from multiprocessing import Queue, Pool
def f():
return [42, None, 'hello']
if __name__ == '__main__':
q = Queue()
p = Pool(1)
result = p.apply_async(f)
print(result.get())
Related Topics
Python Sockets Error Typeerror: a Bytes-Like Object Is Required, Not 'Str' with Send Function
Python MySQL Connector Database Query with %S Fails
Make Executable File from Multiple Pyx Files Using Cython
Python Pandas: Get Index of Rows Which Column Matches Certain Value
Python != Operation VS "Is Not"
How to Install Another Version of Python to Virtualenv
How to Delete the Contents of a Folder
How to Implement a Tree in Python
Failed to Install Python Cryptography Package with Pip and Setup.Py
Split by Comma and Strip Whitespace in Python
Django: Redirect to Previous Page After Login
In Django, How Does One Filter a Queryset with Dynamic Field Lookups
How to Refer to Relative Paths of Resources When Working with a Code Repository
Matplotlib Does Not Show My Drawings Although I Call Pyplot.Show()
Renaming a Virtualenv Folder Without Breaking It
What Programming Language Features Are Well Suited for Developing a Live Coding Framework