Python multiprocessing pool hangs at join?
Sorry to answer my own question, but I've found at least a workaround so in case anyone else has a similar issue I want to post it here. I'll accept any better answers out there.
I believe the root of the issue is http://bugs.python.org/issue9400 . This tells me two things:
- I'm not crazy, what I'm trying to do really is supposed to work
- At least in python2, it is very difficult if not impossible to pickle 'exceptions' back to the parent process. Simple ones work, but many others don't.
In my case, my worker function was launching a subprocess that was segfaulting. This returned CalledProcessError exception, which is not pickleable. For some reason, this makes the pool object in the parent go out to lunch and not return from the call to join().
In my particular case, I don't care what the exception was. At most I want to log it and keep going. To do this, I simply wrap my top worker function in a try/except clause. If the worker throws any exception, it is caught before trying to return to the parent process, logged, and then the worker process exits normally since it's no longer trying to send the exception through. See below:
def process_file_wrapped(filenamen, foo, bar, baz=biz):
try:
process_file(filename, foo, bar, baz=biz)
except:
print('%s: %s' % (filename, traceback.format_exc()))
Then, I have my initial map function call process_file_wrapped() instead of the original one. Now my code works as intended.
Python multiprocessing hanging on pool.join()
@Anarkopsykotik is correct: you must use a main
, and you can get it to print by returning a result to the main thread.
Here's a working example.
import multiprocessing
import os
def test_function(arg1=1,arg2=2):
string="arg1 = {0}, arg2 = {1}".format(arg1,arg2) +" from process id: "+ str(os.getpid())
return string
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
for i in range(6):
result = pool.apply_async(test_function)
print(result.get(timeout=1))
pool.close()
pool.join()
python multiprocessing - process hangs on join for large queue
The qout
queue in the subprocess gets full. The data you put in it from foo()
doesn't fit in the buffer of the OS's pipes used internally, so the subprocess blocks trying to fit more data. But the parent process is not reading this data: it is simply blocked too, waiting for the subprocess to finish. This is a typical deadlock.
Hang during pool.join() asynchronously processing a queue
Your issue happens because you do not process the Queue
before the join call.
When you are using a multiprocessing.Queue
, you should empty it before trying to join the feeder process. The Process
wait for all the object put in the Queue
to be flushed before terminating. I don't know why it is the case even for Queue
with large size but it might be linked to the fact that the underlying os.pipe
object do not have a size large enough.
So putting your get
call before the pool.join
should solve your problem.
PROCESS_COUNT = 6
def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
result_queue = multiprocessing.Queue()
async_result = []
for chunk in list_chunks:
async_result.append(pool.apply_async(
func1, (chunk, result_queue)))
done = 0
while done < 3:
res = queue.get()
if res == None:
done += 1
else:
all_filtered.append(res)
pool.close()
pool.join()
# do work with allFiltered
def func1(sub_list, result_queue):
# mapping function
results = []
for i in sub_list:
result_queue.append(updateDict(i))
result_queue.append(None)
One question is why do you need to handle the communication by yourself? you could just let the Pool
manage that for you if you re factor:
PROCESS_COUNT = 6
def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
async_result = []
for chunk in list_chunks:
async_result.append(pool.apply_async(func1, (chunk,)))
pool.close()
pool.join()
# Reduce the result
allFiltered = [res.get() for res in async_result]
# do work with allFiltered
def func1(sub_list):
# mapping function
results = []
for i in sub_list:
results.append(updateDict(i))
return results
This permits to avoid this kind of bug.
EDIT
Finally, you can even reduce your code even further by using the Pool.map
function, which even handle chunksize.
If your chunks gets too big, you might get error in the pickling process of the results (as stated in your comment). You can thus reduce adapt the size of the chink using map
:
PROCESS_COUNT = 6
def filter(aBigList):
# Run in parallel a internal function of mp.Pool which run
# UpdateDict on chunk of 100 item in aBigList and return them.
# The map function takes care of the chunking, dispatching and
# collect the items in the right order.
with multiprocessing.Pool(processes=PROCESS_COUNT) as pool:
allFiltered = pool.map(updateDict, aBigList, chunksize=100)
# do work with allFiltered
Related Topics
Python: Requests.Exceptions.Connectionerror. Max Retries Exceeded With Url
Importerror: No Module Named Sklearn (Python)
How to Convert a Float into Hex
Get Rid of Columns With Null Value in Json Output
Getting the Bounding Box of the Recognized Words Using Python-Tesseract
How to Make Multiple Empty Lists in Python
Index Out of Bounds Error:Python
Jupyter Notebook, Python3 Print Function: No Output, No Error
How to Convert Dict Value to a Float
Split List into Two Parts Based on Some Delimiter in Each List Element in Python
How to Repeat Each Test Multiple Times in a Py.Test Run
Valueerror: X and Y Must Be the Same Size
Collecting and Reporting Pytest Results
Convert Np.Array of Type Float64 to Type Uint8 Scaling Values
Python Command Not Working in Command Prompt