Multiprocessing.Pool: What's the Difference Between Map_Async and Imap

multiprocessing.Pool: What's the difference between map_async and imap?

There are two key differences between imap/imap_unordered and map/map_async:

  1. The way they consume the iterable you pass to them.
  2. The way they return the result back to you.

map consumes your iterable by converting the iterable to a list (assuming it isn't a list already), breaking it into chunks, and sending those chunks to the worker processes in the Pool. Breaking the iterable into chunks performs better than passing each item in the iterable between processes one item at a time - particularly if the iterable is large. However, turning the iterable into a list in order to chunk it can have a very high memory cost, since the entire list will need to be kept in memory.

imap doesn't turn the iterable you give it into a list, nor does break it into chunks (by default). It will iterate over the iterable one element at a time, and send them each to a worker process. This means you don't take the memory hit of converting the whole iterable to a list, but it also means the performance is slower for large iterables, because of the lack of chunking. This can be mitigated by passing a chunksize argument larger than default of 1, however.

The other major difference between imap/imap_unordered and map/map_async, is that with imap/imap_unordered, you can start receiving results from workers as soon as they're ready, rather than having to wait for all of them to be finished. With map_async, an AsyncResult is returned right away, but you can't actually retrieve results from that object until all of them have been processed, at which points it returns the same list that map does (map is actually implemented internally as map_async(...).get()). There's no way to get partial results; you either have the entire result, or nothing.

imap and imap_unordered both return iterables right away. With imap, the results will be yielded from the iterable as soon as they're ready, while still preserving the ordering of the input iterable. With imap_unordered, results will be yielded as soon as they're ready, regardless of the order of the input iterable. So, say you have this:

import multiprocessing
import time

def func(x):
time.sleep(x)
return x + 2

if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

This will output:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

If you use p.imap_unordered instead of p.imap, you'll see:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

If you use p.map or p.map_async().get(), you'll see:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

So, the primary reasons to use imap/imap_unordered over map_async are:

  1. Your iterable is large enough that converting it to a list would cause you to run out of/use too much memory.
  2. You want to be able to start processing the results before all of them are completed.

Python Multiprocessing: What's the difference between map and imap?

That is the difference. One reason why you might use imap instead of map is if you wanted to start processing the first few results without waiting for the rest to be calculated. map waits for every result before returning.

As for chunksize, it is sometimes more efficient to dole out work in larger quantities because every time the worker requests more work, there is IPC and synchronization overhead.

multiprocessing: map vs map_async

There are four choices to mapping jobs to processes. You have to consider multi-args, concurrency, blocking, and ordering. map and map_async only differ with respect to blocking. map_async is non-blocking where as map is blocking

So let's say you had a function

from multiprocessing import Pool
import time

def f(x):
print x*x

if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(f, range(10))
r = pool.map_async(f, range(10))
# DO STUFF
print 'HERE'
print 'MORE'
r.wait()
print 'DONE'

Example output:

0
1
9
4
16
25
36
49
64
81
0
HERE
1
4
MORE
16
25
36
9
49
64
81
DONE

pool.map(f, range(10)) will wait for all 10 of those function calls to finish so we see all the prints in a row.
r = pool.map_async(f, range(10)) will execute them asynchronously and only block when r.wait() is called so we see HERE and MORE in between but DONE will always be at the end.

multiprocessing.Pool: When to use apply, apply_async or map?

Back in the old days of Python, to call a function with arbitrary arguments, you would use apply:

apply(f,args,kwargs)

apply still exists in Python2.7 though not in Python3, and is generally not used anymore. Nowadays,

f(*args,**kwargs)

is preferred. The multiprocessing.Pool modules tries to provide a similar interface.

Pool.apply is like Python apply, except that the function call is performed in a separate process. Pool.apply blocks until the function is completed.

Pool.apply_async is also like Python's built-in apply, except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call. The get() method blocks until the function is completed. Thus, pool.apply(func, args, kwargs) is equivalent to pool.apply_async(func, args, kwargs).get().

In contrast to Pool.apply, the Pool.apply_async method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get().

For example:

import multiprocessing as mp
import time

def foo_pool(x):
time.sleep(2)
return x*x

result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)

def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)

if __name__ == '__main__':
apply_async_with_callback()

may yield a result such as

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]

Notice, unlike pool.map, the order of the results may not correspond to the order in which the pool.apply_async calls were made.


So, if you need to run a function in a separate process, but want the current process to block until that function returns, use Pool.apply. Like Pool.apply, Pool.map blocks until the complete result is returned.

If you want the Pool of worker processes to perform many function calls asynchronously, use Pool.apply_async. The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async.

Notice also that you could call a number of different functions with Pool.apply_async (not all calls need to use the same function).

In contrast, Pool.map applies the same function to many arguments.
However, unlike Pool.apply_async, the results are returned in an order corresponding to the order of the arguments.

Python multiprocessing Pool map and imap

Since you already put all your files in a list, you could put them directly into a queue. The queue is then shared with your sub-processes that take the file names from the queue and do their stuff. No need to do it twice (first into list, then pickle list by Pool.imap). Pool.imap is doing exactly the same but without you knowing it.

todolist = []
for infile in os.listdir():
todolist.append(infile)

can be replaced by:

todolist = Queue()
for infile in os.listdir():
todolist.put(infile)

The complete solution would then look like:

def process_file(inqueue):
for infile in iter(inqueue.get, "STOP"):
#do stuff until inqueue.get returns "STOP"
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile

def main():
nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
process = [Process(target=process_file,
args=(todolist) for x in range(nprocesses)]
for p in process:
#task the processes to stop when all files are handled
#"STOP" is at the very end of queue
todolist.put("STOP")
for p in process:
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()

Python Multiprocessing.Pool lazy iteration

Let's look at the end of the program first.

The multiprocessing module uses atexit to call multiprocessing.util._exit_function when your program ends.

If you remove g2.next(), your program ends quickly.

The _exit_function eventually calls Pool._terminate_pool. The main thread changes the state of pool._task_handler._state from RUN to TERMINATE. Meanwhile the pool._task_handler thread is looping in Pool._handle_tasks and bails out when it reaches the condition

            if thread._state:
debug('task handler found thread._state != RUN')
break

(See /usr/lib/python2.6/multiprocessing/pool.py)

This is what stops the task handler from fully consuming your generator, g(). If you look in Pool._handle_tasks you'll see

        for i, task in enumerate(taskseq):
...
try:
put(task)
except IOError:
debug('could not put task on queue')
break

This is the code which consumes your generator. (taskseq is not exactly your generator, but as taskseq is consumed, so is your generator.)

In contrast, when you call g2.next() the main thread calls IMapIterator.next, and waits when it reaches self._cond.wait(timeout).

That the main thread is waiting instead of
calling _exit_function is what allows the task handler thread to run normally, which means fully consuming the generator as it puts tasks in the workers' inqueue in the Pool._handle_tasks function.

The bottom line is that all Pool map functions consume the entire iterable that it is given. If you'd like to consume the generator in chunks, you could do this instead:

import multiprocessing as mp
import itertools
import time

def g():
for el in xrange(50):
print el
yield el

def f(x):
time.sleep(1)
return x * x

if __name__ == '__main__':
pool = mp.Pool(processes=4) # start 4 worker processes
go = g()
result = []
N = 11
while True:
g2 = pool.map(f, itertools.islice(go, N))
if g2:
result.extend(g2)
time.sleep(1)
else:
break
print(result)


Related Topics



Leave a reply



Submit