Get Return Value from Process

How to get the return value of a function passed to multiprocessing.Process?

Use shared variable to communicate. For example like this:

import multiprocessing

def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum

if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
print(return_dict.values())

getting the return value of a function used in multiprocess

Answer

from multiprocessing import Process, Queue

Q = Queue()

def my_func(arg):
Q.put('Hello, ' + arg)

p1 = Process(target=my_func, args=('John',))
p1.start()
print(Q.get())
p1.join()

Get return value from process

use:

Process P = Process.Start(sPhysicalFilePath, Param);
P.WaitForExit();
int result = P.ExitCode;

from MSDN

How to get function return value from a multiprocessing.Process?

you can also use the concurrent.futures module

import concurrent.futures
_pool = concurrent.futures.ThreadPoolExecutor()

def func1():
for i in range(0, 100000000):
pass
return 'abc'

def func2():
for i in range(0, 100000000):
pass
return 'xyz'

if __name__=='__main__':
p1 = _pool.submit(func1)
p2 = _pool.submit(func2)

print(p1.result(), p2.result())

How to get the return value of a function in multiprocessing code

Use multiprocessing.Pool when you want to retrieve return values.

def print_cube(num):
aa1 = num * num * num
return aa1

def main():
with Pool(5) as p:
results = p.map(print_cube, range(10, 15))
print(results)

if __name__ == "__main__":
main()

How can I get return value from a spawned process in Erlang?

To pass information between processes, you use ! to send a message to another process's mailbox, and you use a receive clause to extract a message from a process mailbox. Here is an example:

-module(a).
-compile(export_all).

%% Worker process:
say(From, 2, 0) ->
From ! {self(), [1,2]};
say(From, A, B) ->
say(From, A-1, B-1).

%% Main process:
loop(0) ->
ok;
loop(Times) ->
Pid = spawn(a, say, [self(), 4, 2]),
receive %%waits here for result before spawning another process--no concurrency
{Pid, Result} ->
io:fwrite( "L is ~w ~n", [Result] )
end,
loop(Times-1).

%% Test:
run() ->
loop(4).

In the shell:

7> c(a).   
a.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,a}

8> a:run().
L is [1,2]
L is [1,2]
L is [1,2]
L is [1,2]
ok

9>

Or, you can spawn all the processes, then read the results as they come in:

-module(a).
-compile(export_all).

%% Worker process:
say(From, 2, 0) ->
From ! [1,2];
say(From, A, B) ->
say(From, A-1, B-1).

%% Main process:
loop(N) ->
loop(N, N).

loop(0, Times) ->
display_results(Times);
loop(N, Times) ->
spawn(a, say, [self(), 4, 2]),
loop(N-1, Times).

display_results(0) ->
ok;
display_results(Times) ->
receive
Result ->
io:format("L is ~w~n", [Result])
end,
display_results(Times-1).

%% Test:
run() ->
loop(4).

To ensure that you only receive messages from the processes that you spawned, you can do this:

-module(a).
-compile(export_all).

%% Worker process:
say(From, 2, 0) ->
From ! {self(), [1,2]};
say(From, A, B) ->
say(From, A-1, B-1).

%% Main process:
loop(Times) ->
loop(Times, _Pids=[]).

loop(0, Pids) ->
display_results(Pids);
loop(Times, Pids) ->
Pid = spawn(a, say, [self(), 4, 2]),
loop(Times-1, [Pid|Pids]).

display_results([]) ->
ok;
display_results([Pid|Pids]) ->
receive
{Pid, Result} ->
io:format("L is ~w~n", [Result])
end,
display_results(Pids).

%% Test:
run() ->
loop(4).

There are some risks when using a receive like that: if a worker process crashes before it sends the message to your main process, then your main process will be stuck indefinitely in the receive while waiting for a message to arrive from the crashed process. One solution: use a timeout in the receive. Another: use spawn_monitor().

How to return value from function run by multiprocessing process

You can use a Pipe, or a shared memory Value (or similarly an Array), to communicate between processes. Here's an example of using a Pipe:

import multiprocessing as mp

def worker(p):
msg = 'Hello from child!'
print("sending {!r} to parent".format(msg))
p.send(msg)
v = p.recv()
print("got {!r} from parent".format(v))

if __name__ == '__main__':
p_conn, c_conn = mp.Pipe()
p = mp.Process(target=worker, args=(c_conn,))
p.start()
msg = 'Hello from parent!'
print("got {!r} from child".format(p_conn.recv()))
print("sending {!r} to child".format(msg))
p_conn.send(msg)
p.join()

Or, you could use a Pool, which works in the most general case of needing N embarrassingly parallel workers, each with a return value. (Note, I'm using multiprocess here, which is a bit more flexible than multiprocessing -- e.g. it works better in the interpreter):

>>> import multiprocess as mp
>>> import time
>>> def process(n):
... num = int()
... while True:
... print '...sleeping %s' % num
... time.sleep(1)
... num += 1
... if num > 10:
... break
... return time.time()
...
>>> mp.Pool(2).map(process, [None]*2)
...sleeping 0
...sleeping 0
...sleeping 1
...sleeping 1
...sleeping 2
...sleeping 2
...sleeping 3
...sleeping 3
...sleeping 4
...sleeping 4
...sleeping 5
...sleeping 5
...sleeping 6
...sleeping 6
...sleeping 7
...sleeping 7
...sleeping 8
...sleeping 8
...sleeping 9
...sleeping 9
...sleeping 10
...sleeping 10
[1540486371.700522, 1540486371.700522]

How to return values from Process- or Thread instances?

You will need a multiprocessing.Pipe or a multiprocessing.Queue to send the results back to your parent-process. If you just do I/0, you should use a Thread instead of a Process, since it's more lightweight and most time will be spend on waiting. I'm showing you how it's done for Process and Threads in general.


Process with Queue

The multiprocessing queue is build on top of a pipe and access is synchronized with locks/semaphores. Queues are thread- and process-safe, meaning you can use one queue for multiple producer/consumer-processes and even multiple threads in these processes. Adding the first item on the queue will also start a feeder-thread in the calling process. The additional overhead of a multiprocessing.Queue makes using a pipe for single-producer/single-consumer scenarios preferable and more performant.

Here's how to send and retrieve a result with a multiprocessing.Queue:

from multiprocessing import Process, Queue

SENTINEL = 'SENTINEL'

def sim_busy(out_queue, x):
for _ in range(int(x)):
assert 1 == 1
result = x
out_queue.put(result)
# If all results are enqueued, send a sentinel-value to let the parent know
# no more results will come.
out_queue.put(SENTINEL)

if __name__ == '__main__':

out_queue = Queue()

p = Process(target=sim_busy, args=(out_queue, 150e6)) # 150e6 == 150000000.0
p.start()

for result in iter(out_queue.get, SENTINEL): # sentinel breaks the loop
print(result)

The queue is passed as argument into the function, results are .put() on the queue and the parent get.()s from the queue. .get() is a blocking call, execution does not resume until something is to get (specifying timeout parameter is possible). Note the work sim_busy does here is cpu-intensive, that's when you would choose processes over threads.


Process & Pipe

For one-to-one connections a pipe is enough. The setup is nearly identical, just the methods are named differently and a call to Pipe() returns two connection objects. In duplex mode, both objects are read-write ends, with duplex=False (simplex) the first connection object is the read-end of the pipe, the second is the write-end. In this basic scenario we just need a simplex-pipe:

from multiprocessing import Process, Pipe

SENTINEL = 'SENTINEL'

def sim_busy(write_conn, x):
for _ in range(int(x)):
assert 1 == 1
result = x
write_conn.send(result)
# If all results are send, send a sentinel-value to let the parent know
# no more results will come.
write_conn.send(SENTINEL)

if __name__ == '__main__':

# duplex=False because we just need one-way communication in this case.
read_conn, write_conn = Pipe(duplex=False)

p = Process(target=sim_busy, args=(write_conn, 150e6)) # 150e6 == 150000000.0
p.start()

for result in iter(read_conn.recv, SENTINEL): # sentinel breaks the loop
print(result)

Thread & Queue

For use with threading, you want to switch to queue.Queue. queue.Queue is build on top of a collections.deque, adding some locks to make it thread-safe. Unlike with multiprocessing's queue and pipe, objects put on a queue.Queue won't get pickled. Since threads share the same memory address-space, serialization for memory-copying is unnecessary, only pointers are transmitted.

from threading import Thread
from queue import Queue
import time

SENTINEL = 'SENTINEL'

def sim_io(out_queue, query):
time.sleep(1)
result = query + '_result'
out_queue.put(result)
# If all results are enqueued, send a sentinel-value to let the parent know
# no more results will come.
out_queue.put(SENTINEL)

if __name__ == '__main__':

out_queue = Queue()

p = Thread(target=sim_io, args=(out_queue, 'my_query'))
p.start()

for result in iter(out_queue.get, SENTINEL): # sentinel-value breaks the loop
print(result)

  • Read here why for result in iter(out_queue.get, SENTINEL):
    should be prefered over a while True...break setup, where possible.
  • Read here why you should use if __name__ == '__main__': in all your scripts and especially in multiprocessing.
  • More about get()-usage here.


Related Topics



Leave a reply



Submit