How to get the return value of a function passed to multiprocessing.Process?
Use shared variable to communicate. For example like this:
import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
getting the return value of a function used in multiprocess
Answer
from multiprocessing import Process, Queue
Q = Queue()
def my_func(arg):
Q.put('Hello, ' + arg)
p1 = Process(target=my_func, args=('John',))
p1.start()
print(Q.get())
p1.join()
Get return value from process
use:
Process P = Process.Start(sPhysicalFilePath, Param);
P.WaitForExit();
int result = P.ExitCode;
from MSDN
How to get function return value from a multiprocessing.Process?
you can also use the concurrent.futures
module
import concurrent.futures
_pool = concurrent.futures.ThreadPoolExecutor()
def func1():
for i in range(0, 100000000):
pass
return 'abc'
def func2():
for i in range(0, 100000000):
pass
return 'xyz'
if __name__=='__main__':
p1 = _pool.submit(func1)
p2 = _pool.submit(func2)
print(p1.result(), p2.result())
How to get the return value of a function in multiprocessing code
Use multiprocessing.Pool when you want to retrieve return values.
def print_cube(num):
aa1 = num * num * num
return aa1
def main():
with Pool(5) as p:
results = p.map(print_cube, range(10, 15))
print(results)
if __name__ == "__main__":
main()
How can I get return value from a spawned process in Erlang?
To pass information between processes, you use !
to send a message to another process's mailbox, and you use a receive clause
to extract a message from a process mailbox. Here is an example:
-module(a).
-compile(export_all).
%% Worker process:
say(From, 2, 0) ->
From ! {self(), [1,2]};
say(From, A, B) ->
say(From, A-1, B-1).
%% Main process:
loop(0) ->
ok;
loop(Times) ->
Pid = spawn(a, say, [self(), 4, 2]),
receive %%waits here for result before spawning another process--no concurrency
{Pid, Result} ->
io:fwrite( "L is ~w ~n", [Result] )
end,
loop(Times-1).
%% Test:
run() ->
loop(4).
In the shell:
7> c(a).
a.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,a}
8> a:run().
L is [1,2]
L is [1,2]
L is [1,2]
L is [1,2]
ok
9>
Or, you can spawn all the processes, then read the results as they come in:
-module(a).
-compile(export_all).
%% Worker process:
say(From, 2, 0) ->
From ! [1,2];
say(From, A, B) ->
say(From, A-1, B-1).
%% Main process:
loop(N) ->
loop(N, N).
loop(0, Times) ->
display_results(Times);
loop(N, Times) ->
spawn(a, say, [self(), 4, 2]),
loop(N-1, Times).
display_results(0) ->
ok;
display_results(Times) ->
receive
Result ->
io:format("L is ~w~n", [Result])
end,
display_results(Times-1).
%% Test:
run() ->
loop(4).
To ensure that you only receive
messages from the processes that you spawned, you can do this:
-module(a).
-compile(export_all).
%% Worker process:
say(From, 2, 0) ->
From ! {self(), [1,2]};
say(From, A, B) ->
say(From, A-1, B-1).
%% Main process:
loop(Times) ->
loop(Times, _Pids=[]).
loop(0, Pids) ->
display_results(Pids);
loop(Times, Pids) ->
Pid = spawn(a, say, [self(), 4, 2]),
loop(Times-1, [Pid|Pids]).
display_results([]) ->
ok;
display_results([Pid|Pids]) ->
receive
{Pid, Result} ->
io:format("L is ~w~n", [Result])
end,
display_results(Pids).
%% Test:
run() ->
loop(4).
There are some risks when using a receive
like that: if a worker process crashes before it sends the message to your main process, then your main process will be stuck indefinitely in the receive while waiting for a message to arrive from the crashed process. One solution: use a timeout in the receive. Another: use spawn_monitor().
How to return value from function run by multiprocessing process
You can use a Pipe
, or a shared memory Value
(or similarly an Array
), to communicate between processes. Here's an example of using a Pipe
:
import multiprocessing as mp
def worker(p):
msg = 'Hello from child!'
print("sending {!r} to parent".format(msg))
p.send(msg)
v = p.recv()
print("got {!r} from parent".format(v))
if __name__ == '__main__':
p_conn, c_conn = mp.Pipe()
p = mp.Process(target=worker, args=(c_conn,))
p.start()
msg = 'Hello from parent!'
print("got {!r} from child".format(p_conn.recv()))
print("sending {!r} to child".format(msg))
p_conn.send(msg)
p.join()
Or, you could use a Pool
, which works in the most general case of needing N
embarrassingly parallel workers, each with a return value. (Note, I'm using multiprocess
here, which is a bit more flexible than multiprocessing
-- e.g. it works better in the interpreter):
>>> import multiprocess as mp
>>> import time
>>> def process(n):
... num = int()
... while True:
... print '...sleeping %s' % num
... time.sleep(1)
... num += 1
... if num > 10:
... break
... return time.time()
...
>>> mp.Pool(2).map(process, [None]*2)
...sleeping 0
...sleeping 0
...sleeping 1
...sleeping 1
...sleeping 2
...sleeping 2
...sleeping 3
...sleeping 3
...sleeping 4
...sleeping 4
...sleeping 5
...sleeping 5
...sleeping 6
...sleeping 6
...sleeping 7
...sleeping 7
...sleeping 8
...sleeping 8
...sleeping 9
...sleeping 9
...sleeping 10
...sleeping 10
[1540486371.700522, 1540486371.700522]
How to return values from Process- or Thread instances?
You will need a multiprocessing.Pipe
or a multiprocessing.Queue
to send the results back to your parent-process. If you just do I/0, you should use a Thread
instead of a Process
, since it's more lightweight and most time will be spend on waiting. I'm showing you how it's done for Process and Threads in general.
Process with Queue
The multiprocessing queue is build on top of a pipe and access is synchronized with locks/semaphores. Queues are thread- and process-safe, meaning you can use one queue for multiple producer/consumer-processes and even multiple threads in these processes. Adding the first item on the queue will also start a feeder-thread in the calling process. The additional overhead of a multiprocessing.Queue
makes using a pipe for single-producer/single-consumer scenarios preferable and more performant.
Here's how to send and retrieve a result with a multiprocessing.Queue
:
from multiprocessing import Process, Queue
SENTINEL = 'SENTINEL'
def sim_busy(out_queue, x):
for _ in range(int(x)):
assert 1 == 1
result = x
out_queue.put(result)
# If all results are enqueued, send a sentinel-value to let the parent know
# no more results will come.
out_queue.put(SENTINEL)
if __name__ == '__main__':
out_queue = Queue()
p = Process(target=sim_busy, args=(out_queue, 150e6)) # 150e6 == 150000000.0
p.start()
for result in iter(out_queue.get, SENTINEL): # sentinel breaks the loop
print(result)
The queue is passed as argument into the function, results are .put()
on the queue and the parent get.()
s from the queue. .get()
is a blocking call, execution does not resume until something is to get (specifying timeout parameter is possible). Note the work sim_busy
does here is cpu-intensive, that's when you would choose processes over threads.
Process & Pipe
For one-to-one connections a pipe is enough. The setup is nearly identical, just the methods are named differently and a call to Pipe()
returns two connection objects. In duplex mode, both objects are read-write ends, with duplex=False
(simplex) the first connection object is the read-end of the pipe, the second is the write-end. In this basic scenario we just need a simplex-pipe:
from multiprocessing import Process, Pipe
SENTINEL = 'SENTINEL'
def sim_busy(write_conn, x):
for _ in range(int(x)):
assert 1 == 1
result = x
write_conn.send(result)
# If all results are send, send a sentinel-value to let the parent know
# no more results will come.
write_conn.send(SENTINEL)
if __name__ == '__main__':
# duplex=False because we just need one-way communication in this case.
read_conn, write_conn = Pipe(duplex=False)
p = Process(target=sim_busy, args=(write_conn, 150e6)) # 150e6 == 150000000.0
p.start()
for result in iter(read_conn.recv, SENTINEL): # sentinel breaks the loop
print(result)
Thread & Queue
For use with threading, you want to switch to queue.Queue
. queue.Queue
is build on top of a collections.deque
, adding some locks to make it thread-safe. Unlike with multiprocessing's queue and pipe, objects put on a queue.Queue
won't get pickled. Since threads share the same memory address-space, serialization for memory-copying is unnecessary, only pointers are transmitted.
from threading import Thread
from queue import Queue
import time
SENTINEL = 'SENTINEL'
def sim_io(out_queue, query):
time.sleep(1)
result = query + '_result'
out_queue.put(result)
# If all results are enqueued, send a sentinel-value to let the parent know
# no more results will come.
out_queue.put(SENTINEL)
if __name__ == '__main__':
out_queue = Queue()
p = Thread(target=sim_io, args=(out_queue, 'my_query'))
p.start()
for result in iter(out_queue.get, SENTINEL): # sentinel-value breaks the loop
print(result)
- Read here why
for result in iter(out_queue.get, SENTINEL):
should be prefered over awhile True...break
setup, where possible. - Read here why you should use
if __name__ == '__main__':
in all your scripts and especially in multiprocessing. - More about
get()
-usage here.
Related Topics
What Do the "+N" Values Mean at the End of a Method Name in a Stack Trace
How to Recognize If a String Contains Unicode Chars
The Connection Was Not Closed the Connection's Current State Is Open
How to Convert Namevaluecollection to JSON String
How to Detect Which .Net Runtime Is Being Used (Ms VS. Mono)
Dynamically Changing Schema in Entity Framework Core
HTML Agility Pack - Removing Unwanted Tags Without Removing Content
Formatting Datetime in ASP.NET Core 3.0 Using System.Text.JSON
How to Pass Properties as "Out" or "Ref" Parameters
Creating an Mvvm Friendly Dialog Strategy
How to Convert Percentage String to Double
Detect Windows Font Size (100%, 125%, and 150%)
What Are the Naming Conventions in C#
An Expression Tree Lambda May Not Contain a Null Propagating Operator