multiprocessing.Pool spawning new childern after terminate() on Linux/Python2.7?
On this page, Jesse Noller, author of the multiprocessing module, shows that the correct way to handle KeyboardInterrupt
is to have the subprocesses return -- not reraise the exception. This allows the main process to terminate the pool.
However, as the code below shows, the main process does not reach the except KeyboardInterrupt
block until after all the tasks generated by pool.map
have been run. This is why (I believe) you are seeing extra calls to your worker function, run_nlin
, after Ctrl-C
has been pressed.
One possible workaround is to have all the worker functions test if a multiprocessing.Event
has been set. If the event has been set, then have the worker bail out early, otherwise, go ahead with the long calculation.
import logging
import multiprocessing as mp
import time
logger = mp.log_to_stderr(logging.WARNING)
def worker(x):
try:
if not terminating.is_set():
logger.warn("Running worker({x!r})".format(x = x))
time.sleep(3)
else:
logger.warn("got the message... we're terminating!")
except KeyboardInterrupt:
logger.warn("terminating is set")
terminating.set()
return x
def initializer(terminating_):
# This places terminating in the global namespace of the worker subprocesses.
# This allows the worker function to access `terminating` even though it is
# not passed as an argument to the function.
global terminating
terminating = terminating_
def main():
terminating = mp.Event()
result = []
pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
params = range(12)
try:
logger.warn("starting pool runs")
result = pool.map(worker, params)
pool.close()
except KeyboardInterrupt:
logger.warn("^C pressed")
pool.terminate()
finally:
pool.join()
logger.warn('done: {r}'.format(r = result))
if __name__ == '__main__':
main()
Running the script yields:
% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)
Here Ctrl-C is pressed; each of the workers sets the terminating
event. We really only need one to set it, but this works despite the small inefficiency.
C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set
Now all the other tasks queued by pool.map
are run:
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!
Finally the main process reaches the except KeyboardInterrupt
block.
[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []
Python Multiprocessing - Too Slow
You can use the Pool primitives to solve your problem. You don't need to share an Event object which access is synchronised and slow.
Here I give an example on how to terminate a Pool given the desired result from a worker.
You can simply signal the Pool by returning a specific value and terminate the pool within a callback.
Kill Python Multiprocessing Pool
SIGQUIT (Ctrl + \) will kill all processes even under Python 2.x.
You can also update to Python 3.x, where this behavior (only child gets the signal) seems to have been fixed.
Keyboard Interrupts with python's multiprocessing Pool
This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.
Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace
results = pool.map(slowly_square, range(40))
with
results = pool.map_async(slowly_square, range(40)).get(9999999)
or similar.
Failures with Python multiprocessing.Pool when maxtasksperchild is set
maxtasksperchild causes multiprocessing to respawn child processes. The idea is to get rid of any cruft that is building up. The problem is, you can get new cruft from the parent. When the child respawns, it gets the current state of the parent process, which is different than the orignal spawn. You are doing your work in the script's global namespace, so you are changing the environment the child will see quite a bit. Specifically, you use a variable called 'count' that masks a previous 'from itertools import count' statement.
To fix this:
use namespaces (itertools.count, like you said in the comment) to reduce name collisions
do your work in a function so that local variables aren't propagated to the child.
Python Multiprocessing: Handling Child Errors in Parent
I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).
Specifically what I do is:
- Subclass
multiprocessing.Process
or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary) - always provide a shared error
multiprocessing.Queue
from the main process to each worker process - enclose the entire run code in a
try: ... except Exception as e
. Then when something unexpected happens send an error package with:- the process id that died
- the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
- of course handle expected issues as normal within the normal operation of the worker
- (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
- define a stop token in the class or for functions.
- When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
- the wrapping loop checks the input q for the token or whatever other input you want
The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.
Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?
Related Topics
Installing Python 2.7 Without Root
Error with Igraph Library - Deprecated Library
How to Control the Keyboard and Mouse with Python
Why Does Python "Preemptively" Hang When Trying to Calculate a Very Large Number
How to Get the Owner and Group of a Folder with Python on a Linux MAChine
I Cant Init Google Cloud Sdk on Ubuntu
Finding the Command for a Specific Pid in Linux from Python
How Does One Set Specific Vim-Bindings in Ipython 5.0.0
Permission Denied When Executing Python File in Linux
Python Memory Debugging with Gdb
How to Handle Os.System Sigkill Signal Inside Python
Gae " No Attribute 'Httpshandler' " Dev_Appserver.Py
Python Script Prints Output of Os.System Before Print
Subprocess.Popen(): Oserror: [Errno 8] Exec Format Error in Python