multiprocessing.Pool: What's the difference between map_async and imap?
There are two key differences between imap
/imap_unordered
and map
/map_async
:
- The way they consume the iterable you pass to them.
- The way they return the result back to you.
map
consumes your iterable by converting the iterable to a list (assuming it isn't a list already), breaking it into chunks, and sending those chunks to the worker processes in the Pool
. Breaking the iterable into chunks performs better than passing each item in the iterable between processes one item at a time - particularly if the iterable is large. However, turning the iterable into a list in order to chunk it can have a very high memory cost, since the entire list will need to be kept in memory.
imap
doesn't turn the iterable you give it into a list, nor does break it into chunks (by default). It will iterate over the iterable one element at a time, and send them each to a worker process. This means you don't take the memory hit of converting the whole iterable to a list, but it also means the performance is slower for large iterables, because of the lack of chunking. This can be mitigated by passing a chunksize
argument larger than default of 1, however.
The other major difference between imap
/imap_unordered
and map
/map_async
, is that with imap
/imap_unordered
, you can start receiving results from workers as soon as they're ready, rather than having to wait for all of them to be finished. With map_async
, an AsyncResult
is returned right away, but you can't actually retrieve results from that object until all of them have been processed, at which points it returns the same list that map
does (map
is actually implemented internally as map_async(...).get()
). There's no way to get partial results; you either have the entire result, or nothing.
imap
and imap_unordered
both return iterables right away. With imap
, the results will be yielded from the iterable as soon as they're ready, while still preserving the ordering of the input iterable. With imap_unordered
, results will be yielded as soon as they're ready, regardless of the order of the input iterable. So, say you have this:
import multiprocessing
import time
def func(x):
time.sleep(x)
return x + 2
if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
This will output:
3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
If you use p.imap_unordered
instead of p.imap
, you'll see:
3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)
If you use p.map
or p.map_async().get()
, you'll see:
3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
So, the primary reasons to use imap
/imap_unordered
over map_async
are:
- Your iterable is large enough that converting it to a list would cause you to run out of/use too much memory.
- You want to be able to start processing the results before all of them are completed.
Python Multiprocessing: What's the difference between map and imap?
That is the difference. One reason why you might use imap instead of map is if you wanted to start processing the first few results without waiting for the rest to be calculated. map waits for every result before returning.
As for chunksize, it is sometimes more efficient to dole out work in larger quantities because every time the worker requests more work, there is IPC and synchronization overhead.
multiprocessing: map vs map_async
There are four choices to mapping jobs to processes. You have to consider multi-args, concurrency, blocking, and ordering. map
and map_async
only differ with respect to blocking. map_async
is non-blocking where as map
is blocking
So let's say you had a function
from multiprocessing import Pool
import time
def f(x):
print x*x
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(f, range(10))
r = pool.map_async(f, range(10))
# DO STUFF
print 'HERE'
print 'MORE'
r.wait()
print 'DONE'
Example output:
0
1
9
4
16
25
36
49
64
81
0
HERE
1
4
MORE
16
25
36
9
49
64
81
DONE
pool.map(f, range(10))
will wait for all 10 of those function calls to finish so we see all the prints in a row.r = pool.map_async(f, range(10))
will execute them asynchronously and only block when r.wait()
is called so we see HERE
and MORE
in between but DONE
will always be at the end.
multiprocessing.Pool: When to use apply, apply_async or map?
Back in the old days of Python, to call a function with arbitrary arguments, you would use apply
:
apply(f,args,kwargs)
apply
still exists in Python2.7 though not in Python3, and is generally not used anymore. Nowadays,
f(*args,**kwargs)
is preferred. The multiprocessing.Pool
modules tries to provide a similar interface.
Pool.apply
is like Python apply
, except that the function call is performed in a separate process. Pool.apply
blocks until the function is completed.
Pool.apply_async
is also like Python's built-in apply
, except that the call returns immediately instead of waiting for the result. An AsyncResult
object is returned. You call its get()
method to retrieve the result of the function call. The get()
method blocks until the function is completed. Thus, pool.apply(func, args, kwargs)
is equivalent to pool.apply_async(func, args, kwargs).get()
.
In contrast to Pool.apply
, the Pool.apply_async
method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get()
.
For example:
import multiprocessing as mp
import time
def foo_pool(x):
time.sleep(2)
return x*x
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)
def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
may yield a result such as
[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]
Notice, unlike pool.map
, the order of the results may not correspond to the order in which the pool.apply_async
calls were made.
So, if you need to run a function in a separate process, but want the current process to block until that function returns, use Pool.apply
. Like Pool.apply
, Pool.map
blocks until the complete result is returned.
If you want the Pool of worker processes to perform many function calls asynchronously, use Pool.apply_async
. The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async
.
Notice also that you could call a number of different functions with Pool.apply_async
(not all calls need to use the same function).
In contrast, Pool.map
applies the same function to many arguments.
However, unlike Pool.apply_async
, the results are returned in an order corresponding to the order of the arguments.
Python multiprocessing Pool map and imap
Since you already put all your files in a list, you could put them directly into a queue. The queue is then shared with your sub-processes that take the file names from the queue and do their stuff. No need to do it twice (first into list, then pickle list by Pool.imap). Pool.imap is doing exactly the same but without you knowing it.
todolist = []
for infile in os.listdir():
todolist.append(infile)
can be replaced by:
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
The complete solution would then look like:
def process_file(inqueue):
for infile in iter(inqueue.get, "STOP"):
#do stuff until inqueue.get returns "STOP"
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile
def main():
nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
process = [Process(target=process_file,
args=(todolist) for x in range(nprocesses)]
for p in process:
#task the processes to stop when all files are handled
#"STOP" is at the very end of queue
todolist.put("STOP")
for p in process:
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()
Python Multiprocessing.Pool lazy iteration
Let's look at the end of the program first.
The multiprocessing module uses atexit
to call multiprocessing.util._exit_function
when your program ends.
If you remove g2.next()
, your program ends quickly.
The _exit_function
eventually calls Pool._terminate_pool
. The main thread changes the state of pool._task_handler._state
from RUN
to TERMINATE
. Meanwhile the pool._task_handler
thread is looping in Pool._handle_tasks
and bails out when it reaches the condition
if thread._state:
debug('task handler found thread._state != RUN')
break
(See /usr/lib/python2.6/multiprocessing/pool.py)
This is what stops the task handler from fully consuming your generator, g()
. If you look in Pool._handle_tasks
you'll see
for i, task in enumerate(taskseq):
...
try:
put(task)
except IOError:
debug('could not put task on queue')
break
This is the code which consumes your generator. (taskseq
is not exactly your generator, but as taskseq
is consumed, so is your generator.)
In contrast, when you call g2.next()
the main thread calls IMapIterator.next
, and waits when it reaches self._cond.wait(timeout)
.
That the main thread is waiting instead of
calling _exit_function
is what allows the task handler thread to run normally, which means fully consuming the generator as it put
s tasks in the worker
s' inqueue
in the Pool._handle_tasks
function.
The bottom line is that all Pool
map functions consume the entire iterable that it is given. If you'd like to consume the generator in chunks, you could do this instead:
import multiprocessing as mp
import itertools
import time
def g():
for el in xrange(50):
print el
yield el
def f(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
pool = mp.Pool(processes=4) # start 4 worker processes
go = g()
result = []
N = 11
while True:
g2 = pool.map(f, itertools.islice(go, N))
if g2:
result.extend(g2)
time.sleep(1)
else:
break
print(result)
Related Topics
What Is the Fastest Way to Open Urls in New Tabs via Selenium - Python
Best Way to Format Integer as String with Leading Zeros
Pyspark: Split Multiple Array Columns into Rows
Numpy: Find First Index of Value Fast
How to Close a Thread When Multithreading
How to Scrape a Website Which Requires Login Using Python and Beautifulsoup
How to Prevent Errno 32 Broken Pipe
How to Parse a Website Using Selenium and Beautifulsoup in Python
Differencebetween .Quit and .Quit in Pygame
On Localhost, How to Pick a Free Port Number
Reference List Item by Index Within Django Template
How to Fix "Importerror: No Module Named ..." Error in Python
How to Break a Long Line to Multiple Lines in Python
Why Can a Python Dict Have Multiple Keys with the Same Hash
How to Convert a Timezone Aware String to Datetime in Python Without Dateutil
Progress Indicator During Pandas Operations
How to Manage Third-Party Python Libraries with Google App Engine? (Virtualenv? Pip)