Python Multiprocessing Pool Hangs At Join

Python multiprocessing pool hangs at join?

Sorry to answer my own question, but I've found at least a workaround so in case anyone else has a similar issue I want to post it here. I'll accept any better answers out there.

I believe the root of the issue is http://bugs.python.org/issue9400 . This tells me two things:

  • I'm not crazy, what I'm trying to do really is supposed to work
  • At least in python2, it is very difficult if not impossible to pickle 'exceptions' back to the parent process. Simple ones work, but many others don't.

In my case, my worker function was launching a subprocess that was segfaulting. This returned CalledProcessError exception, which is not pickleable. For some reason, this makes the pool object in the parent go out to lunch and not return from the call to join().

In my particular case, I don't care what the exception was. At most I want to log it and keep going. To do this, I simply wrap my top worker function in a try/except clause. If the worker throws any exception, it is caught before trying to return to the parent process, logged, and then the worker process exits normally since it's no longer trying to send the exception through. See below:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
try:
process_file(filename, foo, bar, baz=biz)
except:
print('%s: %s' % (filename, traceback.format_exc()))

Then, I have my initial map function call process_file_wrapped() instead of the original one. Now my code works as intended.

Python multiprocessing hanging on pool.join()

@Anarkopsykotik is correct: you must use a main, and you can get it to print by returning a result to the main thread.

Here's a working example.

import multiprocessing
import os

def test_function(arg1=1,arg2=2):
string="arg1 = {0}, arg2 = {1}".format(arg1,arg2) +" from process id: "+ str(os.getpid())
return string

if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
for i in range(6):
result = pool.apply_async(test_function)
print(result.get(timeout=1))
pool.close()
pool.join()

python multiprocessing - process hangs on join for large queue

The qout queue in the subprocess gets full. The data you put in it from foo() doesn't fit in the buffer of the OS's pipes used internally, so the subprocess blocks trying to fit more data. But the parent process is not reading this data: it is simply blocked too, waiting for the subprocess to finish. This is a typical deadlock.

Hang during pool.join() asynchronously processing a queue

Your issue happens because you do not process the Queue before the join call.
When you are using a multiprocessing.Queue, you should empty it before trying to join the feeder process. The Process wait for all the object put in the Queue to be flushed before terminating. I don't know why it is the case even for Queue with large size but it might be linked to the fact that the underlying os.pipe object do not have a size large enough.
So putting your get call before the pool.join should solve your problem.

PROCESS_COUNT = 6

def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
result_queue = multiprocessing.Queue()
async_result = []
for chunk in list_chunks:
async_result.append(pool.apply_async(
func1, (chunk, result_queue)))

done = 0
while done < 3:
res = queue.get()
if res == None:
done += 1
else:
all_filtered.append(res)

pool.close()
pool.join()

# do work with allFiltered

def func1(sub_list, result_queue):
# mapping function
results = []
for i in sub_list:
result_queue.append(updateDict(i))

result_queue.append(None)

One question is why do you need to handle the communication by yourself? you could just let the Pool manage that for you if you re factor:

PROCESS_COUNT = 6

def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
async_result = []
for chunk in list_chunks:
async_result.append(pool.apply_async(func1, (chunk,)))

pool.close()
pool.join()

# Reduce the result
allFiltered = [res.get() for res in async_result]

# do work with allFiltered

def func1(sub_list):
# mapping function
results = []
for i in sub_list:
results.append(updateDict(i))
return results

This permits to avoid this kind of bug.

EDIT
Finally, you can even reduce your code even further by using the Pool.map function, which even handle chunksize.
If your chunks gets too big, you might get error in the pickling process of the results (as stated in your comment). You can thus reduce adapt the size of the chink using map:

PROCESS_COUNT = 6

def filter(aBigList):
# Run in parallel a internal function of mp.Pool which run
# UpdateDict on chunk of 100 item in aBigList and return them.
# The map function takes care of the chunking, dispatching and
# collect the items in the right order.
with multiprocessing.Pool(processes=PROCESS_COUNT) as pool:
allFiltered = pool.map(updateDict, aBigList, chunksize=100)

# do work with allFiltered


Related Topics



Leave a reply



Submit