Filling a Queue and Managing Multiprocessing in Python

Fill a Queue with Objects from several data loaders using multiprocessing

Your specific error means that you cannot have a pool as part of your class when you are passing class methods to a pool. What I would suggest could be the following:

import multiprocessing as mp
from queue import Empty

class QueueGenerator(object):
def __init__(self, data_loader_list):
self.data_loader_list = data_loader_list
self.queue = mp.Queue(maxsize=16)

def __iter__(self):
processes = list()
for _ in range(4):
pr = mp.Process(target=fill_queue, args=(self.queue, self.data_loader_list))
pr.start()
processes.append(pr)
return self

def __next__(self):
try:
return self.queue.get(timeout=1) # this should have a value, otherwise your loop will never stop. make it something that ensures your processes have enough time to update the queue but not too long that your program freezes for an extended period of time after all information is processed
except Empty:
raise StopIteration

# have fill queue as a separate function
def fill_queue(queue, gen):
while True:
try:
value = next(gen)
queue.put(value)
except StopIteration: # assumes the given data_loader_list is an iterator
break
print('stopping')

gen = iter(range(70))

qg = QueueGenerator(gen)

for val in qg:
print(val)
# test if it works several times:
for val in qg:
print(val)

The next issue for you to solve I think is to have the data_loader_list be something that provides new information in every separate process. But since you have not given any information about that I can't help you with that. The above does however provide you a way to have the processes fill your queue which is then passed out as an iterator.

Python Multiprocessing: Topping off multiprocessing queue before becoming empty

Why don't you use a multiprocessing Pool to accomplish this?

import multiprocessing
pool = multiprocessing.Pool()
pool.map(your_function, dataset) ##dataset is a list; could be other iterable object
pool.close()
pool.join()

The multiprocessing.Pool() can have the argument processes=# where you specify the # of jobs you want to start. If you don't specify this parameter, it will start as many jobs as you have cores (so if you have 4 cores, 4 jobs). When one job finishes it'll automatically start the next one; you don't have to manage that.

Multiprocessing: https://docs.python.org/2/library/multiprocessing.html

Multiprocessing Queue in Python

try this:

import multiprocessing

num_procs = 4
def do_work(message):
print "work",message ,"completed"

def worker():
for item in iter( q.get, None ):
do_work(item)
q.task_done()
q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
procs.append( multiprocessing.Process(target=worker) )
procs[-1].daemon = True
procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)

q.join()

for p in procs:
q.put( None )

q.join()

for p in procs:
p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()

Working with deque object across multiple processes

Here's an example of how to share something between processes by extending the multiprocessing.managers.BaseManager class to support deques.

There's a Customized managers section in the documentation about creating them.

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager

class DequeManager(BaseManager):
pass

class DequeProxy(object):
def __init__(self, *args):
self.deque = collections.deque(*args)
def __len__(self):
return self.deque.__len__()
def appendleft(self, x):
self.deque.appendleft(x)
def append(self, x):
self.deque.append(x)
def pop(self):
return self.deque.pop()
def popleft(self):
return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
exposed=['__len__', 'append', 'appendleft',
'pop', 'popleft'])

process_shared_deque = None # Global only within each process.

def my_init(q):
""" Initialize module-level global. """
global process_shared_deque
process_shared_deque = q
q.append("Hello world")

def map_fn(i):
process_shared_deque.append(i) # deque's don't have a "put()" method.

if __name__ == "__main__":
manager = DequeManager()
manager.start()
shared_deque = manager.DequeProxy()

with Pool(3, my_init, (shared_deque,)) as pool:
pool.map(map_fn, range(3))

for p in range(len(shared_deque)): # Show left-to-right contents.
print(shared_deque.popleft())

Output:

Hello world
0
1
2
Hello world
Hello world

Best way to wait for queue population python multiprocessing

What you're asking for is the default behavior of queue.get. It will wait (block) until an item is available from the queue. Sending a sentinel value is indeed the prefered way to end a child-process.

Your scenario could be simplified to something like that:

import random
import time
from multiprocessing import Manager, Process

def save_data(save_que, file_):
for data in iter(save_que.get, 'STOP'):
print("saving data", data)
print("All data saved")
return

def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")

if __name__ == '__main__':

manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
produce_data(save_que)
save_p.join()

Edit to answer question in the comment:

How should I implement the stop message in case the cue is accessed by several different agents and each one has a randomized time for finishing its task?

It's not much different, you have to put as much sentinel values into the queue as much consumers you have.

A utility function which returns a streamlogger to see where the action is:

def get_stream_logger(level=logging.DEBUG):
"""Return logger with configured StreamHandler."""
stream_logger = logging.getLogger('stream_logger')
stream_logger.handlers = []
stream_logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
fmt = '[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s'
formatter = logging.Formatter(fmt)
sh.setFormatter(formatter)
stream_logger.addHandler(sh)

return stream_logger

Code with multiple consumers:

import random
import time
from multiprocessing import Manager, Process
import logging

def save_data(save_que, file_):
stream_logger = get_stream_logger()
for data in iter(save_que.get, 'STOP'):
time.sleep(random.randint(1, 5)) # random delay
stream_logger.debug(f"saving: {data}") # DEBUG
stream_logger.debug("all data saved") # DEBUG
return

def produce_data(save_que, n_workers):
stream_logger = get_stream_logger()
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
stream_logger.debug(f"producing: {data}") # DEBUG
save_que.put(data)

for _ in range(n_workers):
save_que.put("STOP")

if __name__ == '__main__':

file_ = "file"
n_processes = 2

manager = Manager()
save_que = manager.Queue()

processes = []
for _ in range(n_processes):
processes.append(Process(target=save_data, args=(save_que, file_)))

for p in processes:
p.start()

produce_data(save_que, n_workers=n_processes)

for p in processes:
p.join()

Example output:

[2018-09-02 20:10:35,885 DEBUG    MainProcess] --- producing: 2
[2018-09-02 20:10:38,887 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:38,887 DEBUG Process-2] --- saving: 2
[2018-09-02 20:10:39,889 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:40,889 DEBUG Process-3] --- saving: 8
[2018-09-02 20:10:40,890 DEBUG Process-2] --- saving: 8
[2018-09-02 20:10:42,890 DEBUG MainProcess] --- producing: 1
[2018-09-02 20:10:43,891 DEBUG Process-3] --- saving: 1
[2018-09-02 20:10:46,893 DEBUG MainProcess] --- producing: 5
[2018-09-02 20:10:46,894 DEBUG Process-3] --- all data saved
[2018-09-02 20:10:50,895 DEBUG Process-2] --- saving: 5
[2018-09-02 20:10:50,896 DEBUG Process-2] --- all data saved

Process finished with exit code 0

Simple process manager using multiprocessing in Python

as far as I can tell your main would just become:

def main():
tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with multiprocessing.Pool(POOL_SIZE) as pool:
pool.map(sleep, tasks)

i.e. you've just reimplemented a pool, but inefficiently (Pool reuses Processes where possible) and in not as safely, Pool goes to lots of effort to cleanup on exceptions

How to use multiprocessing queue in Python?

My main problem is that I really don't know how to implement multiprocessing.queue correctly, you cannot really instantiate the object for each process since they will be separate queues, how do you make sure that all processes relate to a shared queue (or in this case, queues)

This is a simple example of a reader and writer sharing a single queue... The writer sends a bunch of integers to the reader; when the writer runs out of numbers, it sends 'DONE', which lets the reader know to break out of the read loop.

You can spawn as many reader processes as you like...

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get() # Read from the queue and do nothing
if msg == "DONE":
break

def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue. A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii) # Put 'count' numbers into queue

### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")

def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc

all_reader_procs.append(reader_p)

return all_reader_procs

if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue() # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

writer(count, len(all_reader_procs), qq) # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")

_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish

print(" reader_p() idx:%s is done" % idx)

print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")


Related Topics



Leave a reply



Submit