How to Recover the Return Value of a Function Passed to Multiprocessing.Process

How can I recover the return value of a function passed to multiprocessing.Process?

Use shared variable to communicate. For example like this:

import multiprocessing

def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum

if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
print(return_dict.values())

getting the return value of a function used in multiprocess

Answer

from multiprocessing import Process, Queue

Q = Queue()

def my_func(arg):
Q.put('Hello, ' + arg)

p1 = Process(target=my_func, args=('John',))
p1.start()
print(Q.get())
p1.join()

How to get the return value of a function in multiprocessing Python code

Use multiprocessing.Pool when you want to retrieve return values.

def print_cube(num):
aa1 = num * num * num
return aa1

def main():
with Pool(5) as p:
results = p.map(print_cube, range(10, 15))
print(results)

if __name__ == "__main__":
main()

Fastest way to recover the return value of a function passed to multiprocessing.Process

Runtime sequential

processing time * number of runs

Runtime parallel

(processing time) * (number of runs / number of parallel processes) 
+ (process start overhead * number parallel processes)

If your processing time isn't long to begin with, then the overhead of making new processes will outweighs the savings you get from parallelism.

If your sequential batch takes ~300 seconds, then your parallel implementation will probably only take <11 seconds, but if your sequential batch only takes 10 seconds, then running it on 30 processes may actually take longer.

How to get function return value from a multiprocessing.Process?

you can also use the concurrent.futures module

import concurrent.futures
_pool = concurrent.futures.ThreadPoolExecutor()

def func1():
for i in range(0, 100000000):
pass
return 'abc'

def func2():
for i in range(0, 100000000):
pass
return 'xyz'

if __name__=='__main__':
p1 = _pool.submit(func1)
p2 = _pool.submit(func2)

print(p1.result(), p2.result())

How to return value from function run by multiprocessing process

You can use a Pipe, or a shared memory Value (or similarly an Array), to communicate between processes. Here's an example of using a Pipe:

import multiprocessing as mp

def worker(p):
msg = 'Hello from child!'
print("sending {!r} to parent".format(msg))
p.send(msg)
v = p.recv()
print("got {!r} from parent".format(v))

if __name__ == '__main__':
p_conn, c_conn = mp.Pipe()
p = mp.Process(target=worker, args=(c_conn,))
p.start()
msg = 'Hello from parent!'
print("got {!r} from child".format(p_conn.recv()))
print("sending {!r} to child".format(msg))
p_conn.send(msg)
p.join()

Or, you could use a Pool, which works in the most general case of needing N embarrassingly parallel workers, each with a return value. (Note, I'm using multiprocess here, which is a bit more flexible than multiprocessing -- e.g. it works better in the interpreter):

>>> import multiprocess as mp
>>> import time
>>> def process(n):
... num = int()
... while True:
... print '...sleeping %s' % num
... time.sleep(1)
... num += 1
... if num > 10:
... break
... return time.time()
...
>>> mp.Pool(2).map(process, [None]*2)
...sleeping 0
...sleeping 0
...sleeping 1
...sleeping 1
...sleeping 2
...sleeping 2
...sleeping 3
...sleeping 3
...sleeping 4
...sleeping 4
...sleeping 5
...sleeping 5
...sleeping 6
...sleeping 6
...sleeping 7
...sleeping 7
...sleeping 8
...sleeping 8
...sleeping 9
...sleeping 9
...sleeping 10
...sleeping 10
[1540486371.700522, 1540486371.700522]

how do I efficiently retrieve return value from function executed with multiprocessing?

Using concurrent.futures.ProcessPoolExecutor makes things much easier.

First, replace in calculatespi the q.put(spi3) by return spi3 and remove the q parameter. Then the "main" code can be written as

#Main Notebook
if name == "main":

from concurrent.futures import ProcessPoolExecutor

args = []

for x in range (3):
for y in range(3):
args.append(prcoba[:,x,y])

with ProcessPoolExecutor() as executor:
spipi = list(executor.map(calculatespi, args))

The executor takes care about everything else.



Related Topics



Leave a reply



Submit