Sharing a Result Queue Among Several Processes

Sharing a result queue among several processes

Try using multiprocessing.Manager to manage your queue and to also make it accessible to different workers.

import multiprocessing
def worker(name, que):
que.put("%d is done" % name)

if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
workers = pool.apply_async(worker, (33, q))

Sharing many queues among processes in Python

It sounds like your issues started when you tried to share a multiprocessing.Queue() by passing it as an argument. You can get around this by creating a managed queue instead:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()

When you use a manager to create it, you are storing and passing around a proxy to the queue, rather than the queue itself, so even when the object you pass to your worker processes is a copied, it will still point at the same underlying data structure: your queue. It's very similar (in concept) to pointers in C/C++. If you create your queues this way, you will be able to pass them when you launch a worker process.

Since you can pass queues around now, you no longer need your dictionary to be managed. Keep a normal dictionary in main that will store all the mappings, and only give your worker processes the queues they need, so they won't need access to any mappings.

I've written an example of this here. It looks like you are passing objects between your workers, so that's what's done here. Imagine we have two stages of processing, and the data both starts and ends in the control of main. Look at how we can create the queues that connect the workers like a pipeline, but by giving them only they queues they need, there's no need for them to know about any mappings:

import multiprocessing as mp

def stage1(q_in, q_out):

q_out.put(q_in.get()+"Stage 1 did some work.\n")
return

def stage2(q_in, q_out):

q_out.put(q_in.get()+"Stage 2 did some work.\n")
return

def main():

pool = mp.Pool()
manager = mp.Manager()

# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()

# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")

# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")

pool.close()
pool.join()

return

if __name__ == "__main__":
main()

The code produces this output:

Main started the job.

Stage 1 did some work.

Stage 2 did some work.

Main finished the job.

I didn't include an example of storing the queues or AsyncResults objects in dictionaries, because I still don't quite understand how your program is supposed to work. But now that you can pass your queues freely, you can build your dictionary to store the queue/process mappings as needed.

In fact, if you really do build a pipeline between multiple workers, you don't even need to keep a reference to the "inter-worker" queues in main. Create the queues, pass them to your workers, then only retain references to queues that main will use. I would definitely recommend trying to let old queues be garbage collected as quickly as possible if you really do have "an arbitrary number" of queues.

How can I share a queue among three or more processes?

The comment offered by HALF9000 (using a multiprocessing.Queue) is an improvement over a managed queue and there is a lot to be said for Mark Setchell's comment about going the Redis route if you will be doing a lot of this type of publish/subscribe work and you want something really robust. But that is a quite a bit to bite of off for a possibly one-off situation.

I believe the best-performing solution uses the under-utilized multiprocessing.Pipe on which the multiprocessing.Queue is built. It is not as flexible as a Queue because it really only supports one producer and one consumer, but that is all you need for your purposes and it is much more performant.

When function Pipe([*duplex*]) is called, it returns a pair (conn1, conn2) of multiprocessing.connection.Connection objects representing the ends of the pipe. If duplex is False, then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages. For this application you only need unidirectional connections. The idea is to pass to function read as the second argument a list of connections on which it should broadcast the data it has read to the various processes that need to process it.

from multiprocessing import Process, Pipe
from threading import Thread
import time

def read(pathList, conn_list):
for path in pathList:
value = readFunc(path)
# simulate lots of data:
for _ in range(1_000):
for conn in conn_list:
conn.send(value)
for conn in conn_list:
conn.send(None)

def calc0(src_conn, des_conn):
while True:
data = src_conn.recv()
if data is None:
break
des_conn.send(calcFunc0(data))
des_conn.send(None)

def calc1(src_conn, des_conn):
while True:
data = src_conn.recv()
if data is None:
break
des_conn.send(calcFunc1(data))
des_conn.send(None)

# dummy functions for testing

def readFunc(path):
return path

def calcFunc0(data):
return data.upper()

def calcFunc1(data):
return data.lower()

def process_results(results, conn):
while True:
data = conn.recv()
if data is None:
break
results.append(data)

if __name__ == '__main__':
t = time.time()
readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']

res0_recv, res0_send = Pipe(False)
data0_recv, data0_send = Pipe(False)
res1_recv, res1_send = Pipe(False)
data1_recv, data1_send = Pipe(False)
results0 = []
results1 = []
# start threads to process results
t0 = Thread(target=process_results, args=(results0, res0_recv))
t1 = Thread(target=process_results, args=(results1, res1_recv))
t0.start()
t1.start()
readProcess = Process(target=read, args=(readPathList, [data0_send, data1_send]))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(data0_recv, res0_send))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(data1_recv, res1_send))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
t0.join()
t1.join()
elapsed = time.time() - t
print(len(results0), results0[0], results1[0], elapsed)

Prints:

5000 AA aa 0.34799909591674805

Update

If all the various connections make the code a bit difficult to follow, then we can data hide the connections in a class, Efficient_Queue, which might lead to code more readily decipherable:

from multiprocessing import Process, Pipe
from threading import Thread
import time

class Efficient_Queue:
def __init__(self):
self._recv_conn, self._send_conn = Pipe(False)

def put(self, obj):
self._send_conn.send(obj)
return self

def get(self):
return self._recv_conn.recv()

def read(pathList, q_list):
for path in pathList:
value = readFunc(path)
# simulate lots of data:
for _ in range(1_000):
for q in q_list:
q.put(value)
for q in q_list:
q.put(None)

def calc0(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
des_q.put(calcFunc0(data))
des_q.put(None)

def calc1(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
des_q.put(calcFunc1(data))
des_q.put(None)

# dummy functions for testing

def readFunc(path):
return path

def calcFunc0(data):
return data.upper()

def calcFunc1(data):
return data.lower()

def process_results(results, q):
while True:
data = q.get()
if data is None:
break
results.append(data)

if __name__ == '__main__':
t = time.time()
readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']

res0_q = Efficient_Queue()
res1_q = Efficient_Queue()
data0_q = Efficient_Queue()
data1_q = Efficient_Queue()
results0 = []
results1 = []
# start threads to process results
t0 = Thread(target=process_results, args=(results0, res0_q))
t1 = Thread(target=process_results, args=(results1, res1_q))
t0.start()
t1.start()
readProcess = Process(target=read, args=(readPathList, [data0_q, data1_q]))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(data0_q, res0_q))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(data1_q, res1_q))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
t0.join()
t1.join()
elapsed = time.time() - t
print(len(results0), results0[0], results1[0], elapsed)

Prints:

5000 AA aa 0.3409993648529053

When Efficient_Queue instances are replaced by multiprocessing.Queue instances we get:

5000 AA aa 0.576676607131958

When multiprocessing.Queue instances are replaced by managed queues (i.e. m.Queue() where m is Manager()), we get:

5000 AA aa 2.8409862518310547

Python multiprocessing: RuntimeError: Queue objects should only be shared between processes through inheritance

The Queue implementation in python relies on a system pipe to transmit the data from one process to another and some semaphores to protect the read and write on this pipe.

The pipe is handled as an open file in the process and can only be shared with a subprocess at spawning time, because of OS constraints.

The semaphores are also treated as files that should only be shared at spawning time, at least in UNIX based system, for early version of python.

As these 2 sub objects cannot be shared in general, the Queue cannot be pickled and sent to a subprocess once it has been started.

However, for some OS and recent version of python, it is possible to share the Connection and to create sharable Semaphore. Thus, you could in theory create your own Queue that can be shared between processes. But it involves a lot of hacks and might not be very secured.

Can I pass queue object in multiprocessing pool starmap method

try using Manager() like this:

from multiprocessing import Manager, Pool

def get_data(pageNo, q):
q.put(pageNo * pageNo)

if __name__ == "__main__":
m = Manager()
q = m.Queue()
p = {}
no_pages = 5
pool_tuple = [(x, q) for x in range(1, no_pages)]
with Pool(processes=3) as pool:
pool.starmap(get_data, pool_tuple)
for i in range(1, no_pages):
print("result", i, ":", q.get())

Output:

result 1 : 1
result 2 : 4
result 3 : 9
result 4 : 16


Related Topics



Leave a reply



Submit