multiprocessing.Pool: When to use apply, apply_async or map?
Back in the old days of Python, to call a function with arbitrary arguments, you would use apply
:
apply(f,args,kwargs)
apply
still exists in Python2.7 though not in Python3, and is generally not used anymore. Nowadays,
f(*args,**kwargs)
is preferred. The multiprocessing.Pool
modules tries to provide a similar interface.
Pool.apply
is like Python apply
, except that the function call is performed in a separate process. Pool.apply
blocks until the function is completed.
Pool.apply_async
is also like Python's built-in apply
, except that the call returns immediately instead of waiting for the result. An AsyncResult
object is returned. You call its get()
method to retrieve the result of the function call. The get()
method blocks until the function is completed. Thus, pool.apply(func, args, kwargs)
is equivalent to pool.apply_async(func, args, kwargs).get()
.
In contrast to Pool.apply
, the Pool.apply_async
method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get()
.
For example:
import multiprocessing as mp
import time
def foo_pool(x):
time.sleep(2)
return x*x
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)
def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
may yield a result such as
[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]
Notice, unlike pool.map
, the order of the results may not correspond to the order in which the pool.apply_async
calls were made.
So, if you need to run a function in a separate process, but want the current process to block until that function returns, use Pool.apply
. Like Pool.apply
, Pool.map
blocks until the complete result is returned.
If you want the Pool of worker processes to perform many function calls asynchronously, use Pool.apply_async
. The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async
.
Notice also that you could call a number of different functions with Pool.apply_async
(not all calls need to use the same function).
In contrast, Pool.map
applies the same function to many arguments.
However, unlike Pool.apply_async
, the results are returned in an order corresponding to the order of the arguments.
Purpose of multiprocessing.Pool.apply and multiprocessing.Pool.apply_async
First off, both are meant to operate on argument-tuples (single function calls), contrary to the Pool.map
variants which operate on iterables. So it's not an error when you observe only one process used when you call these functions only once.
You would use Pool.apply_async
instead of one of the Pool.map
versions, where you need more fine grained control over the single tasks you want to distribute.
The Pool.map
versions take an iterable and chunk them into tasks, where every task has the same (mapped) target function.Pool.apply_async
typically isn't called only once with a pool of >1 workers. Since it's asynchronous, you can iterate over manually pre-bundled tasks and submit them to several
worker-processes before any of them has completed. Your task-list here can consist of different target functions like you can see in this answer here. It also allows registering callbacks for results and errors like in this example.
These properties make Pool.apply_async
pretty versatile and a first-choice tool for unusual problem scenarios you cannot get done with one of the Pool.map
versions.
Pool.apply
indeed is not widely usefull at first sight (and second). You could use it to synchronize control flow in a scenario where you start up multiple tasks with apply_async
first and then have a task which has to be completed before you fire up another round of tasks with apply_async
.
Using Pool.apply
could also just mean sparing you to create a single extra Process for an in-between task, when you already have a pool which is currently idling.
map_async vs apply_async:what should I use in this case
I would recommend map_async
for three reasons:
It's cleaner looking code. This:
pool = Pool(processes=proc_num)
async_result = pool.map_async(post_processing_0.main, split_list)
pool.close()
pool.join()looks nicer than this:
pool = Pool(processes=proc_num)
P={}
for i in range(0,proc_num):
P['process_'+str(i)]=pool.apply_async(post_processing_0.main, [split_list[i]])
pool.close()
pool.join()With
apply_async
, if an exception occurs inside ofpost_processing_0.main
, you won't know about it unless you explicitly callP['process_x'].get()
on the failingAsyncResult
object, which would require iterating over all ofP
. Withmap_async
the exception will be raised if you callasync_result.get()
- no iteration required.map_async
has built-in chunking functionality, which will make your code perform noticeably better ifsplit_list
is very large.
Other than that, the behavior is basically the same if you don't care about the results.
Multiprocessing pool 'apply_async' only seems to call function once
apply_async isn't meant to launch multiple processes; it's just meant to call the function with the arguments in one of the processes of the pool. You'll need to make 10 calls if you want the function to be called 10 times.
First, note the docs on apply()
(emphasis added):
apply(func[, args[, kwds]])
Call func with arguments args and keyword arguments kwds. It blocks
until the result is ready. Given this blocks, apply_async() is better
suited for performing work in parallel. Additionally, func is only
executed in one of the workers of the pool.
Now, in the docs for apply_async()
:
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
A variant of the apply() method which returns a result object.
The difference between the two is just that apply_async returns immediately. You can use map()
to call a function multiple times, though if you're calling with the same inputs, then it's a little redudant to create the list of the same argument just to have a sequence of the right length.
However, if you're calling different functions with the same input, then you're really just calling a higher order function, and you could do it with map
or map_async()
like this:
multiprocessing.map(lambda f: f(1), functions)
except that lambda functions aren't pickleable, so you'd need to use a defined function (see How to let Pool.map take a lambda function). You can actually use the builtin apply()
(not the multiprocessing one) (although it's deprecated):
multiprocessing.map(apply,[(f,1) for f in functions])
It's easy enough to write your own, too:
def apply_(f,*args,**kwargs):
return f(*args,**kwargs)
multiprocessing.map(apply_,[(f,1) for f in functions])
Can I pass a method to apply_async or map in python multiprocessing?
The problem appears to be due to the fact that multiprocessing
needs to pickle self.f
while bound methods are not picklable. There is a discussion on how to solve the problem here.
The apply_async
apparently creates an exception which is put inside the future returned. That's why nothing is printed. If a get
is executed on the future, then the exception is raised.
why is more than one worker used in `multiprocessing.Pool().apply_async()`?
Your confusion seems to come from thinking [pool.apply_async(...) for i in range(10)]
is one call, when there are really ten independent calls. A call to any pool-method is a "job". A job generally can lead to one or multiple tasks being distributed. apply
-methods always produce only a single task under the hood. A task is an indivisible unit of work which will be received as a whole by a random pool-worker.
There's only one shared inqueue
, all workers are fed over. Which idling worker will be woken up from waiting to get()
a task from that queue is up to the OS. Your result-entropy for case 1 is still somewhat surprising and probably very lucky, at least unless you confirm you have only two cores.
And yes, your observation for this run is also influenced by the computation time needed for a task, since threads (the scheduled execution unit within a process) usually are scheduled with time slicing policies (e.g. ~20ms for Windows).
Related Topics
Is It a Good Practice to Use Try-Except-Else in Python
Importerror: No Module Named Requests
What's the Purpose of "Send" Function on Python Generators
Programmatically Generate Video or Animated Gif in Python
How to Convert Canvas Content to an Image
How to Start a Python File While Windows Starts
How to Include Package Data with Setuptools/Distutils
Why Is Python 3.X's Super() Magic
How to Change the Font Size on a Matplotlib Plot
How to Extract Multiple JSON Objects from One File
What Is a Cross-Platform Way to Get the Home Directory
How to Use Python to Login to a Webpage and Retrieve Cookies for Later Usage
How to Use Stringio in Python3
How to Initialize a Two-Dimensional Array in Python
Display a Decimal in Scientific Notation