How to kill a process using the multiprocessing module?
You could do it by using a sharing state between the processes and creating a flag value that all the concurrent processes can access (although this may be somewhat inefficient).
Here's what I'm suggesting:
import multiprocessing as mp
import time
def action(run_flag):
x = 0
while run_flag.value:
if x < 1000000:
x = x + 1
else:
x = 0
print('action() terminating')
def timer(run_flag, secs):
time.sleep(secs)
run_flag.value = False
if __name__ == '__main__':
run_flag = mp.Value('I', True)
loop_process = mp.Process(target=action, args=(run_flag,))
loop_process.start()
timer_process = mp.Process(target=timer, args=(run_flag, 2.0))
timer_process.start()
loop_process.join()
timer_process.join()
print('done')
Terminate a Python multiprocessing program once a one of its workers meets a certain condition
No process can stop another short of brute force os.kill()
-like sledgehammers. Don't go there.
To do this sanely, you need to rework your basic approach: the main process and the worker processes need to communicate with each other.
I'd flesh it out, but the example so far is too bare-bones to make it useful. For example, as written, no more than num_workers
calls to rand()
are ever made, so there's no reason to believe any of them must be > 0.7.
Once the worker function grows a loop, then it becomes more obvious. For example, the worker could check to see if an mp.Event
is set at the top of the loop, and just exit if it is. The main process would set the Event
when it wants the workers to stop.
And a worker could set a different mp.Event
when it found a value > 0.7. The main process would wait for that Event
, then set the "time to stop" Event
for workers to see, then do the usual loop .join()
-ing the workers for a clean shutdown.
EDIT
Here's fleshing out a portable, clean solution, assuming the workers are going to keep going until at least one finds a value > 0.7. Note that I removed numpy
from this, because it's irrelevant to this code. The code here should work fine under any stock Python on any platform supporting multiprocessing
:
import random
from time import sleep
def worker(i, quit, foundit):
print "%d started" % i
while not quit.is_set():
x = random.random()
if x > 0.7:
print '%d found %g' % (i, x)
foundit.set()
break
sleep(0.1)
print "%d is done" % i
if __name__ == "__main__":
import multiprocessing as mp
quit = mp.Event()
foundit = mp.Event()
for i in range(mp.cpu_count()):
p = mp.Process(target=worker, args=(i, quit, foundit))
p.start()
foundit.wait()
quit.set()
And some sample output:
0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done
Everything shuts down cleanly: no tracebacks, no abnormal terminations, no zombie processes left behind ... clean as a whistle.
KILLING IT
As @noxdafox pointed at, there's a Pool.terminate()
method that does the best it can, across platforms, to kill worker processes no matter what they're doing (e.g., on Windows it calls the platform TerminateProcess()
). I don't recommend it for production code, because killing a process abruptly can leave various shared resources in inconsistent states, or let them leak. There are various warnings about that in the multiprocessing
docs, to which you should add your OS docs.
Still, it can be expedient! Here's a full program using this approach. Note that I bumped the cutoff to 0.95, to make this more likely to take longer than an eyeblink to run:
import random
from time import sleep
def worker(i):
print "%d started" % i
while True:
x = random.random()
print '%d found %g' % (i, x)
if x > 0.95:
return x # triggers callback
sleep(0.5)
# callback running only in __main__
def quit(arg):
print "quitting with %g" % arg
# note: p is visible because it's global in __main__
p.terminate() # kill all pool workers
if __name__ == "__main__":
import multiprocessing as mp
ncpu = mp.cpu_count()
p = mp.Pool(ncpu)
for i in range(ncpu):
p.apply_async(worker, args=(i,), callback=quit)
p.close()
p.join()
And some sample output:
$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]
What exactly is Python multiprocessing Module's .join() Method Doing?
The join()
method, when used with threading
or multiprocessing
, is not related to str.join()
- it's not actually concatenating anything together. Rather, it just means "wait for this [thread/process] to complete". The name join
is used because the multiprocessing
module's API is meant to look as similar to the threading
module's API, and the threading
module uses join
for its Thread
object. Using the term join
to mean "wait for a thread to complete" is common across many programming languages, so Python just adopted it as well.
Now, the reason you see the 20 second delay both with and without the call to join()
is because by default, when the main process is ready to exit, it will implicitly call join()
on all running multiprocessing.Process
instances. This isn't as clearly stated in the multiprocessing
docs as it should be, but it is mentioned in the Programming Guidelines section:
Remember also that non-daemonic processes will be automatically be
joined.
You can override this behavior by setting the daemon
flag on the Process
to True
prior to starting the process:
p = Process(target=say_hello)
p.daemon = True
p.start()
# Both parent and child will exit here, since the main process has completed.
If you do that, the child process will be terminated as soon as the main process completes:
daemon
The process’s daemon flag, a Boolean value. This must be set before
start() is called.The initial value is inherited from the creating process.
When a process exits, it attempts to terminate all of its daemonic
child processes.
Python multiprocessing: how to exit cleanly after an error?
There are two pieces to this puzzle.
- How can I detect and kill all the child processes?
- How can I make a best effort to ensure my code from part 1 is run whenever one process dies?
For part 1, you can use multiprocessing.active_children()
to get a list of all the active children and kill them with Process.terminate()
. Note the use of Process.terminate()
comes with the usual warnings.
from multiprocessing import Process
import multiprocessing
def f(name):
print 'hello', name
while True: pass
if __name__ == '__main__':
for i in xrange(5):
p = Process(target=f, args=('bob',))
p.start()
# At user input, terminate all processes.
raw_input("Press Enter to terminate: ")
for p in multiprocessing.active_children():
p.terminate()
One solution to part 2 is to use sys.excepthook
, as described in this answer. Here is a combined example.
from multiprocessing import Process
import multiprocessing
import sys
from time import sleep
def f(name):
print 'hello', name
while True: pass
def myexcepthook(exctype, value, traceback):
for p in multiprocessing.active_children():
p.terminate()
if __name__ == '__main__':
for i in xrange(5):
p = Process(target=f, args=('bob',))
p.start()
sys.excepthook = myexcepthook
# Sleep for a bit and then force an exception by doing something stupid.
sleep(1)
1 / 0
Python multiprocessing finish the work correctly, but the processes still alive (Linux)
When the with ... as executor:
block exits, there is an implicit call to executor.shutdown(wait=True)
. This will wait for all pending futures to to be done executing "and the resources associated with the executor have been freed", which presumably includes terminating the processes in the pool (if possible?). Why your program terminates (or does it?) or at least you say all the futures have completed executing, while the processes have not terminated is a bit of a mystery. But you haven't provided the code for fun_job
, so who can say why this is so?
One thing you might try is to switch to using the multiprocessing.pool.Pool
class from the multiprocessing
module. It supports a terminate
method, which is implicitly called when its context manager with
block exits, that explicitly attempts to terminate all processes in the pool:
#import concurrent.futures
import multiprocessing
... # etc.
def compute_using_multi_processing(list_comb_ids, dict_ids_seqs):
start = time.perf_counter()
with multiprocessing.Pool(processes=nb_cpu) as executor:
results = executor.map(help_fun_job,
[((pair_ids[0], dict_ids_seqs[pair_ids[0]]), (pair_ids[1], dict_ids_seqs[pair_ids[1]]))
for pair_ids in list_comb_ids])
save_results_to_csv(results)
finish = time.perf_counter()
proccessing_time = str(datetime.timedelta(seconds=round(finish - start, 2)))
print(f' Processing time Finished in {proccessing_time} hh:mm:ss')
Related Topics
Python Read JSON File and Modify
Upload Files in Google App Engine
How to Move Pandas Data from Index to Column After Multiple Groupby
Select Pandas Rows Based on List Index
How to Specify an Authenticated Proxy for a Python Http Connection
Using Colormaps to Set Color of Line in Matplotlib
Replacing Text in a File with Python
What Is the Fastest Way to Parse Large Xml Docs in Python
How to Get Python Requests to Trust a Self Signed Ssl Certificate
Sorting Python List Based on the Length of the String
Easy Pretty Printing of Floats
Meaning of Inter_Op_Parallelism_Threads and Intra_Op_Parallelism_Threads
What's the Difference Between _Builtin_ and _Builtins_
Python Pandas Extract Year from Datetime: Df['Year'] = Df['Date'].Year Is Not Working
Downloading with Chrome Headless and Selenium
Iterate a List with Indexes in Python