How to Use Multiprocessing Queue in Python

How to use multiprocessing queue in Python?

My main problem is that I really don't know how to implement multiprocessing.queue correctly, you cannot really instantiate the object for each process since they will be separate queues, how do you make sure that all processes relate to a shared queue (or in this case, queues)

This is a simple example of a reader and writer sharing a single queue... The writer sends a bunch of integers to the reader; when the writer runs out of numbers, it sends 'DONE', which lets the reader know to break out of the read loop.

You can spawn as many reader processes as you like...

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get() # Read from the queue and do nothing
if msg == "DONE":
break

def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue. A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii) # Put 'count' numbers into queue

### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")

def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc

all_reader_procs.append(reader_p)

return all_reader_procs

if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue() # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

writer(count, len(all_reader_procs), qq) # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")

_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish

print(" reader_p() idx:%s is done" % idx)

print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")

How to use Multiprocessing Queue with Lock

multiprocessing Queues are thread and process safe.

And they support internal blocking mechanism (see the signatures of get/put methods).

You don't need a lock in your case.

import multiprocessing, time, uuid, logging

log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)

queue = multiprocessing.Queue()

def publish(q):
for i in range(20):
data = str(uuid.uuid4())
q.put(data)
log.info('published: %s to queue: %s' % (data, q))
time.sleep(0.2)
q.put(None)

def subscribe(q):
while True:
data = q.get()
if data is None:
log.info('....... end of queue consumption')
break
log.info('.......got: %s to queue: %s' % (data, q))
time.sleep(0.1)

publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()

subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()

publisher.join()
subscriber.join()

Sample output:

[INFO/Process-1] child process calling self.run()
[INFO/Process-1] published: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] .......got: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] process shutting down
[INFO/Process-2] ....... end of queue consumption
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[INFO/Process-2] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

Process finished with exit code 0

python multiprocessing queue implementation

This works for me (on Python 3). Instead of using a Pool, I spawn my own two processes:

from multiprocessing import Process, Queue
from time import sleep

id_list = [1,2,3,4,5,6,7,8,9,10]

queue = Queue()

def mp_worker(queue):

while queue.qsize() >0 :
record = queue.get()
print(record)
sleep(1)

print("worker closed")

def mp_handler():

# Spawn two processes, assigning the method to be executed
# and the input arguments (the queue)
processes = [Process(target=mp_worker, args=(queue,)) for _ in range(2)]

for process in processes:
process.start()
print('Process started')

for process in processes:
process.join()

if __name__ == '__main__':

for id in id_list:
queue.put(id)

mp_handler()

Although the length of the elements to be processed is hardcoded. But it could be a second input argument to for the mp_worker method.

How to use Queue for multiprocessing with Python?

Queues are complicated beasts under the covers. When an (pickle of an) object is put on a queue, parts of it are fed into the underlying OS interprocess communication mechanism, but the rest is left in an in-memory Python buffer, to avoid overwhelming the OS facilities. The stuff in the memory buffer is fed into the OS mechanism as the receiving end makes room for more by taking stuff off the queue.

A consequence is that a worker process cannot end before its memory buffers (feeding into queues) are empty.

In your first program, pickles of integers are so tiny that memory buffers don't come into play. A worker feeds the entire pickle to the OS in one gulp, and the worker can exit then.

But in your second program, the pickles are much larger. A worker sends part of the pickle to the OS, then waits for the main program to take it off the OS mechanism, so it can feed the next part of the pickle. Since your program never takes anything off the queue before calling .join(), the workers wait forever.

So, in general, this is the rule: never attempt to .join() until all queues have been drained.

Note this from the docs:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.

Also, queue.empty() is a poor way to test for this. That can only tell you if data is on the queue at the instant it happens to execute. In parallel processing, that's at best a probabilistic approximation to the truth. In your second example, you know exactly how many items you expect to get from the queue, so this way would be reliable:

for proc in procs:
proc.start()

for i in range(NTHREADS):
print(queue.get().size)

for proc in procs: # join AFTER queue is drained
proc.join()

How to successfully utilize Queue.join() with multiprocessing?

Queues and Pools

Running one at a time... separate problem.

Actually, this is part of the same problem. All this implies that you are not
using a multiprocessing pool managed by Pool. What you do at the moment is to
put all your tasks in a queue, get them straight back out again, and then
process them one at a time using a pool, which only ever gets one task at a
time. These two paradigms are mutually exclusive: if you want to use a pool to
do the work for you, you don't need queue; if you need to handle the queue
yourself, you probably don't want to use pool.

Pool

multiprocessing.Pool and accompanying methods spawn the right number of worker
processes, serialise your function to them, and then set up a queue internally
and handle sending tasks and getting results. This is much easier than doing it
manually, and is normally the right way to go about things:

When you use pool, you do something like this:

results = pool.map(compute_primes, [(0,100_000) for _ in range(8)])

Which will block for you until all the pool has finished, or:

results = pool.map_async(compute_primes, [(0, 100_000) for _ in range(8)])
results.wait() # wait

unless you plan to process the results as they come in, in which case you don't
use results.wait() at all:

for _ in range(8):
result = results.get()
do_stuff(result)

You do use pool.join() or pool.close() just to make sure the pool is shut
down gracefully, but this has nothing to do with getting your results.

Your examples

Your first example works, because you do this:

  • put tasks in a queue
  • get them out one by one and process them
  • join an empty queue -> leave immediately

Your second example fails, because you do this:

  • put tasks in a queue
  • get one task out
  • wait for queue to be empty or done -> blocks indefinitely

In this case, you don't want a queue at all.

Using Queue manually

Aside: where are you getting your Queue from? multiprocessing.Queue is not
joinable; you need multiprocessing.JoinableQueue. threading.Queue should
not be used with multiprocessing. queue.Queue, likewise, should not be used
with `multiprocessing.

When do you use a task queue? When you don't just want to apply a bunch of
arguments to a bunch of functions. Perhaps you want to use a custom class.
Perhaps you want to do something funny. Perhaps you want to do some things with
one type of argument, but something else if the argument is of a certain kind,
and code is better organised this way. In these cases subclassing Process (or
Thread for multithreading) yourself may be clearer and cleaner. None of that
seems to apply in this case.

Using join with queues

.join() is for task queues. It blocks until every task in the queue has
been marked as done. This is handy for when you want to offload some processing
to a bunch of processes, but wait for them before you do anything. so you
normally do something like:

tasks = JoinableQueue()
for t in qs:
tasks.put(t)
start_multiprocessing() # dummy fn
tasks.join() # wait for everything to be done

However in this case you don't do that, or want to do it.

Using multiprocessing and trying to use data in queue and print result from that data as soon as data get in multiprocessing queue

You have a few issues with your code. The main ones are:

  1. Method multiprocessing.Queue.empty is not reliable and should not be used. Moreover, even if it were reliable the queue would be initially empty and would also be empty after function_to_get_from_q takes an item off the queue and before whatever thread is putting data on the queue puts the next value, which only occurs once every second.
  2. You have no code that ultimately actually takes data off the queue using method get.

See the following, heavily commented code for the changes you should make:

import multiprocessing as mp
import datetime as dt

def function_to_get_from_q(queue):
# Method empty on a multiprocessing queue is not reliable and so we use
# a special sentinel value:
while True:
data = queue.get()
if data is None:
break
#some computations on data in Queue
print(result_of_computation)

# Keep collecting the data from live stream, when 1 second is completed put data
# in queue using q.put(data) and keep getting the live data.
# We will put the special value of None on the queue if we wish to signal
# to the child process that there will be no more data to be read and it
# should therefore terminate.
# Then:

if __name__ == "__main__":
# Move definition of q to here, since it should not be re-created in child processes.
# If we did not put this queue creation within this block, then when
# this code is run on platforms that create new processes with the spawn method
# A new queue would be created in the child process but never used:
q = mp.Queue()
process1 = mp.Process(target=function_to_get_from_q, args=(q,))
process1.start()
# Wait for all work to be completed.
# process1 will terminate when it gets the sentinel value None
# as data from the queue.
process1.join()

How to use multiprocessing.Queue.get method?

I originally deleted this answer after I read @Martijn Pieters', since he decribed the "why this doesn't work" in more detail and earlier. Then
I realized, that the use case in OP's example doesn't quite fit to the canonical sounding title of

"How to use multiprocessing.Queue.get method".

That's not because there's
no child process involved for demonstration, but because in real applications hardly ever a queue is pre-filled and only read out after, but reading
and writing happens interleaved with waiting times in between. The extended demonstration code Martijn showed, wouldn't work in the usual scenarios, because the while loop would break too soon when enqueuing doesn't keep up with reading. So here is the answer reloaded, which is able to deal with the usual interleaved feeds & reads scenarios:


Don't rely on queue.empty checks for synchronization.

After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.
...

empty()

Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs

Either use for msg in iter(queue.get, sentinel): to .get() from the queue, where you break out of the loop by passing a sentinel value...iter(callable, sentinel)?

from multiprocessing import Queue

SENTINEL = None

if __name__ == '__main__':

queue = Queue()

for i in [*range(3), SENTINEL]:
queue.put(i)

for msg in iter(queue.get, SENTINEL):
print(msg)

...or use get_nowait() and handle a possible queue.Empty exception if you need a non-blocking solution.

from multiprocessing import Queue
from queue import Empty
import time

SENTINEL = None

if __name__ == '__main__':

queue = Queue()

for i in [*range(3), SENTINEL]:
queue.put(i)

while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)

In case only one process and only one thread within this process is reading the queue, it would be also possible to exchange the last code snippet with:

while True:
if not queue.empty(): # this is not an atomic operation ...
msg = queue.get() # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)

Since a thread could drop the GIL in between checking if not queue.empty() and queue.get(), this wouldn't be suitable for multi-threaded queue-reads in a process. The same applies if multiple processes are reading from the queue.

For single-producer / single-consumer scenarios, using a multiprocessing.Pipe instead of multiprocessing.Queue would be sufficient and more performant, though.

Solutions to Pythons Multiprocessing Queue buffer dead lock? How to get from multiprocessing Queue when its full and continue multiprocessing?

After working with the multiprocessing library for a while, I've found that the simplest way to implement a robust multiprocessing queue is to use multiprocessing.Manager objects. From the docs:

Create a shared queue.Queue object and return a proxy for it.

Rather than allocating a separate thread for flushing data through a pipe, a Manager object creates and manages a standard multithreading queue, which doesn't have to have data flushed through a Pipe (haven't looked through the source code, so I can't say for sure). This means your code can keep chugging away practically indefinitely.

None of this is free, and I've found that the managed queue operates much (almost 20x) slower than a multiprocessing queue in a simple test, though the difference isn't nearly as noticeable when the queue is integrated into a full system, due to other bottlenecks.

Using managed queues can make your IPC far more robust, and it's likely a good idea to take the performance trade-off unless you can find a way to live with the unreliability of a normal multiprocessing queue.



Related Topics



Leave a reply



Submit