Python Multiprocessing: Handling Child Errors in Parent

Python Multiprocessing: Handling Child Errors in Parent

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?

How to catch exceptions thrown by functions executed using multiprocessing.Process() (python)

This can be achieved by overloading the run() method in the multiprocessing.Proccess() class with a try..except statement and setting up a Pipe() to get and store any raised exceptions from the child process into an instance field for named exception:

#!/usr/bin/env python3
import multiprocessing, traceback, time

class Process(multiprocessing.Process):

def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None

def run(self):
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
#raise e # You can still rise this exception if you need to

@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception

# this function will be executed in a child process asynchronously
def failFunction():
raise RuntimeError('trust fall, catch me!')

# execute the helloWorld() function in a child process in the background
process = Process(
target = failFunction,
)
process.start()

# <this is where async stuff would happen>
time.sleep(1)

# catch the child process' exception
try:
process.join()
if process.exception:
raise process.exception
except Exception as e:
print( "Exception caught!" )

Example execution:

user@host:~$ python3 example.py 
Exception caught!
user@host:~$

Solution taken from this answer:

  • https://stackoverflow.com/a/33599967/1174102

Catch child process exception in parent process

You cannot do that with Process objects. The multiprocessing.Pool or the concurrent.futures.ProcessPoolExecutor allow to do that.

pool = multiprocessing.Pool()
task = pool.apply_async(run_crawler_process, (Config.CRAWLER_NAME, locations,c ity_payloads_map, cycle_count))

try:
task.get()
except Exception as error:
print("Error while processing task: %s" % error)

In python, how can I check if a child process exited cleanly or with an error?

read the value of the process' exitcode:

  • None if the process has not finished yet.
  • 0 if the process ended successfully.
  • N if the process had an error, and exited with code N.
  • -N if the process was killed with the signal N. (eg. -15 if killed by SIGTERM)

For example, in your main process:

for p in processes:
p.join()
if p.exitcode > 0:
raise ValueError(f'a process exited with code {p.exitcode}')

And in your runnable:

try:
do_something()
except KnownError as _:
exit(my_known_error_code)


Related Topics



Leave a reply



Submit