Multiprocessing.Pool Spawning New Childern After Terminate() on Linux/Python2.7

multiprocessing.Pool spawning new childern after terminate() on Linux/Python2.7?

On this page, Jesse Noller, author of the multiprocessing module, shows that the correct way to handle KeyboardInterrupt is to have the subprocesses return -- not reraise the exception. This allows the main process to terminate the pool.

However, as the code below shows, the main process does not reach the except KeyboardInterrupt block until after all the tasks generated by pool.map have been run. This is why (I believe) you are seeing extra calls to your worker function, run_nlin, after Ctrl-C has been pressed.

One possible workaround is to have all the worker functions test if a multiprocessing.Event has been set. If the event has been set, then have the worker bail out early, otherwise, go ahead with the long calculation.


import logging
import multiprocessing as mp
import time

logger = mp.log_to_stderr(logging.WARNING)

def worker(x):
try:
if not terminating.is_set():
logger.warn("Running worker({x!r})".format(x = x))
time.sleep(3)
else:
logger.warn("got the message... we're terminating!")
except KeyboardInterrupt:
logger.warn("terminating is set")
terminating.set()
return x

def initializer(terminating_):
# This places terminating in the global namespace of the worker subprocesses.
# This allows the worker function to access `terminating` even though it is
# not passed as an argument to the function.
global terminating
terminating = terminating_

def main():
terminating = mp.Event()
result = []
pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
params = range(12)
try:
logger.warn("starting pool runs")
result = pool.map(worker, params)
pool.close()
except KeyboardInterrupt:
logger.warn("^C pressed")
pool.terminate()
finally:
pool.join()
logger.warn('done: {r}'.format(r = result))

if __name__ == '__main__':
main()

Running the script yields:

% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)

Here Ctrl-C is pressed; each of the workers sets the terminating event. We really only need one to set it, but this works despite the small inefficiency.

  C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set

Now all the other tasks queued by pool.map are run:

[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!

Finally the main process reaches the except KeyboardInterrupt block.

[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []

Python Multiprocessing - Too Slow

You can use the Pool primitives to solve your problem. You don't need to share an Event object which access is synchronised and slow.

Here I give an example on how to terminate a Pool given the desired result from a worker.

You can simply signal the Pool by returning a specific value and terminate the pool within a callback.

Kill Python Multiprocessing Pool

SIGQUIT (Ctrl + \) will kill all processes even under Python 2.x.

You can also update to Python 3.x, where this behavior (only child gets the signal) seems to have been fixed.

Keyboard Interrupts with python's multiprocessing Pool

This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

    results = pool.map(slowly_square, range(40))

with

    results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.

Failures with Python multiprocessing.Pool when maxtasksperchild is set

maxtasksperchild causes multiprocessing to respawn child processes. The idea is to get rid of any cruft that is building up. The problem is, you can get new cruft from the parent. When the child respawns, it gets the current state of the parent process, which is different than the orignal spawn. You are doing your work in the script's global namespace, so you are changing the environment the child will see quite a bit. Specifically, you use a variable called 'count' that masks a previous 'from itertools import count' statement.

To fix this:

  1. use namespaces (itertools.count, like you said in the comment) to reduce name collisions

  2. do your work in a function so that local variables aren't propagated to the child.

Python Multiprocessing: Handling Child Errors in Parent

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?



Related Topics



Leave a reply



Submit