Script Using Multiprocessing Module Does Not Terminate

How to kill a process using the multiprocessing module?

You could do it by using a sharing state between the processes and creating a flag value that all the concurrent processes can access (although this may be somewhat inefficient).

Here's what I'm suggesting:

import multiprocessing as mp
import time

def action(run_flag):
x = 0
while run_flag.value:
if x < 1000000:
x = x + 1
else:
x = 0

print('action() terminating')

def timer(run_flag, secs):
time.sleep(secs)
run_flag.value = False

if __name__ == '__main__':

run_flag = mp.Value('I', True)

loop_process = mp.Process(target=action, args=(run_flag,))
loop_process.start()

timer_process = mp.Process(target=timer, args=(run_flag, 2.0))
timer_process.start()

loop_process.join()
timer_process.join()

print('done')

Terminate a Python multiprocessing program once a one of its workers meets a certain condition

No process can stop another short of brute force os.kill()-like sledgehammers. Don't go there.

To do this sanely, you need to rework your basic approach: the main process and the worker processes need to communicate with each other.

I'd flesh it out, but the example so far is too bare-bones to make it useful. For example, as written, no more than num_workers calls to rand() are ever made, so there's no reason to believe any of them must be > 0.7.

Once the worker function grows a loop, then it becomes more obvious. For example, the worker could check to see if an mp.Event is set at the top of the loop, and just exit if it is. The main process would set the Event when it wants the workers to stop.

And a worker could set a different mp.Event when it found a value > 0.7. The main process would wait for that Event, then set the "time to stop" Event for workers to see, then do the usual loop .join()-ing the workers for a clean shutdown.

EDIT

Here's fleshing out a portable, clean solution, assuming the workers are going to keep going until at least one finds a value > 0.7. Note that I removed numpy from this, because it's irrelevant to this code. The code here should work fine under any stock Python on any platform supporting multiprocessing:

import random
from time import sleep

def worker(i, quit, foundit):
print "%d started" % i
while not quit.is_set():
x = random.random()
if x > 0.7:
print '%d found %g' % (i, x)
foundit.set()
break
sleep(0.1)
print "%d is done" % i

if __name__ == "__main__":
import multiprocessing as mp
quit = mp.Event()
foundit = mp.Event()
for i in range(mp.cpu_count()):
p = mp.Process(target=worker, args=(i, quit, foundit))
p.start()
foundit.wait()
quit.set()

And some sample output:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

Everything shuts down cleanly: no tracebacks, no abnormal terminations, no zombie processes left behind ... clean as a whistle.

KILLING IT

As @noxdafox pointed at, there's a Pool.terminate() method that does the best it can, across platforms, to kill worker processes no matter what they're doing (e.g., on Windows it calls the platform TerminateProcess()). I don't recommend it for production code, because killing a process abruptly can leave various shared resources in inconsistent states, or let them leak. There are various warnings about that in the multiprocessing docs, to which you should add your OS docs.

Still, it can be expedient! Here's a full program using this approach. Note that I bumped the cutoff to 0.95, to make this more likely to take longer than an eyeblink to run:

import random
from time import sleep

def worker(i):
print "%d started" % i
while True:
x = random.random()
print '%d found %g' % (i, x)
if x > 0.95:
return x # triggers callback
sleep(0.5)

# callback running only in __main__
def quit(arg):
print "quitting with %g" % arg
# note: p is visible because it's global in __main__
p.terminate() # kill all pool workers

if __name__ == "__main__":
import multiprocessing as mp
ncpu = mp.cpu_count()
p = mp.Pool(ncpu)
for i in range(ncpu):
p.apply_async(worker, args=(i,), callback=quit)
p.close()
p.join()

And some sample output:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

What exactly is Python multiprocessing Module's .join() Method Doing?

The join() method, when used with threading or multiprocessing, is not related to str.join() - it's not actually concatenating anything together. Rather, it just means "wait for this [thread/process] to complete". The name join is used because the multiprocessing module's API is meant to look as similar to the threading module's API, and the threading module uses join for its Thread object. Using the term join to mean "wait for a thread to complete" is common across many programming languages, so Python just adopted it as well.

Now, the reason you see the 20 second delay both with and without the call to join() is because by default, when the main process is ready to exit, it will implicitly call join() on all running multiprocessing.Process instances. This isn't as clearly stated in the multiprocessing docs as it should be, but it is mentioned in the Programming Guidelines section:

Remember also that non-daemonic processes will be automatically be
joined.

You can override this behavior by setting the daemon flag on the Process to True prior to starting the process:

p = Process(target=say_hello)
p.daemon = True
p.start()
# Both parent and child will exit here, since the main process has completed.

If you do that, the child process will be terminated as soon as the main process completes:

daemon

The process’s daemon flag, a Boolean value. This must be set before
start() is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic
child processes.

Python multiprocessing: how to exit cleanly after an error?

There are two pieces to this puzzle.

  1. How can I detect and kill all the child processes?
  2. How can I make a best effort to ensure my code from part 1 is run whenever one process dies?

For part 1, you can use multiprocessing.active_children() to get a list of all the active children and kill them with Process.terminate(). Note the use of Process.terminate() comes with the usual warnings.

from multiprocessing import Process
import multiprocessing

def f(name):
print 'hello', name
while True: pass

if __name__ == '__main__':
for i in xrange(5):
p = Process(target=f, args=('bob',))
p.start()

# At user input, terminate all processes.
raw_input("Press Enter to terminate: ")
for p in multiprocessing.active_children():
p.terminate()

One solution to part 2 is to use sys.excepthook, as described in this answer. Here is a combined example.

from multiprocessing import Process
import multiprocessing
import sys
from time import sleep

def f(name):
print 'hello', name
while True: pass

def myexcepthook(exctype, value, traceback):
for p in multiprocessing.active_children():
p.terminate()

if __name__ == '__main__':
for i in xrange(5):
p = Process(target=f, args=('bob',))
p.start()
sys.excepthook = myexcepthook

# Sleep for a bit and then force an exception by doing something stupid.
sleep(1)
1 / 0

Python multiprocessing finish the work correctly, but the processes still alive (Linux)

When the with ... as executor: block exits, there is an implicit call to executor.shutdown(wait=True). This will wait for all pending futures to to be done executing "and the resources associated with the executor have been freed", which presumably includes terminating the processes in the pool (if possible?). Why your program terminates (or does it?) or at least you say all the futures have completed executing, while the processes have not terminated is a bit of a mystery. But you haven't provided the code for fun_job, so who can say why this is so?

One thing you might try is to switch to using the multiprocessing.pool.Pool class from the multiprocessing module. It supports a terminate method, which is implicitly called when its context manager with block exits, that explicitly attempts to terminate all processes in the pool:

#import concurrent.futures
import multiprocessing
... # etc.

def compute_using_multi_processing(list_comb_ids, dict_ids_seqs):
start = time.perf_counter()

with multiprocessing.Pool(processes=nb_cpu) as executor:
results = executor.map(help_fun_job,
[((pair_ids[0], dict_ids_seqs[pair_ids[0]]), (pair_ids[1], dict_ids_seqs[pair_ids[1]]))
for pair_ids in list_comb_ids])

save_results_to_csv(results)

finish = time.perf_counter()

proccessing_time = str(datetime.timedelta(seconds=round(finish - start, 2)))
print(f' Processing time Finished in {proccessing_time} hh:mm:ss')


Related Topics



Leave a reply



Submit