How to Use Multiprocessing Pool.Map With Multiple Arguments

How to use multiprocessing pool.map with multiple arguments

The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:

import multiprocessing
from itertools import product

def merge_names(a, b):
return '{} & {}'.format(a, b)

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with, you'll also need to write a wrapper to turn Pool into a context manager. (Thanks to muon for pointing this out.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
return '{} & {}'.format(a, b)

def merge_names_unpack(args):
return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

In simpler cases, with a fixed second argument, you can also use partial, but only in Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()

def merge_names(a, b):
return '{} & {}'.format(a, b)

if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.

Passing multiple parameters to pool.map() function in Python

You can use functools.partial for this (as you suspected):

from functools import partial

def target(lock, iterable_item):
for item in iterable_item:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()

def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()

Example:

def f(a, b, c):
print("{} {} {}".format(a, b, c))

def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
a = "hi"
b = "there"
func = partial(f, a, b)
pool.map(func, iterable)
pool.close()
pool.join()

if __name__ == "__main__":
main()

Output:

hi there 1
hi there 2
hi there 3
hi there 4
hi there 5

How to use multiprocessing with multiple arguments in Python 3?

Assuming that urlset is an iterable, you should use

pool.starmap(processURL, zip(urlSet, repeat(user), repeat(verboseFlag)))

This is because you want to iterate over the urlset but have the same user and verboseFlag for each processURL instance(thus,repeat)

For reference you should take a look at Python multiprocessing pool.map for multiple arguments

The output of zip when iterated over, should look something like

[('www.google.com','user1',True),('www.goodle.uk','user1',True),]

for pool.starmap to make sense of it.

How to use multiprocessing pool.starmap with multiple arguments

You have several issues with your code:

  1. When you iterate the iterator returned by calling method pandas.iterrows() you are passed a tuple t of length 2 where t[0] is the row index and t[1] is a pandas.Series instance. Your worker function, mkpair, will be passed two of these tuples, one from each dataframe. as arguments p1 and p2. But you are calling p1.columns where p1 is a tuple and tuples have no such attribute as columns. So this should have raised an altogether different exception (and a pandas.Series has no such method either). So I don't see how you are getting the exception you claim from the actual code you posted. Moreover, your statement pool = Pool(process=4) is incorrect as the correct keyword argument is processes not process. So you could not possibly be executing this code (I will overlook the missing import statements, which I assume you actually have).
  2. You are creating a progress bar that will be iterated upon submission of the tasks to the multiprocessing pool. So even if your work function took a second to complete (and was coded correctly), the progress bar would practically instantaneously shoot up to 100% as the tasks are submitted. So the only progress you are measuring is task submission, not completion.

What you need to do, assuming you want the progress bar to progress as tasks are completed, is to use a method such as imap_unordered or apply_async with a callback, which will allow you to update the bar as tasks complete. If you want to store the results in a list in task submission order rather than in completion order and you are using imap_unordered, then you need to pass an index to the worker function that it then returns back with the result. The code is simpler if you use apply_async but this method does not allow you to submit tasks in "chunks", which becomes an issue if the number of total tasks being submitted is very large (see the chunksize argument for method imap_unordered). Here is how you would use each method:

Using imap_unordered

from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd

columns = ['x', 'y', 'z']

p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

def mkpair(t: tuple) -> int:
from time import sleep

sleep(1)
# unpack tuple:
idx, rows = t
# unpack again:
row1, row2 = rows
series1 = row1[1]
series2 = row2[1]
return idx, sum(series1[column] * series2[column] for column in columns)

pool = Pool(processes=4)
results = [0] * pairs_len
with tqdm(total=pairs_len) as pbar:
for idx, total in pool.imap_unordered(mkpair, enumerate(pairs)):
results[idx] = total
pbar.update()
pool.close()
pool.join()
print(results)

Using apply_async

from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd

columns = ['x', 'y', 'z']

p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

def mkpair(row1, row2) -> int:
from time import sleep

sleep(1)
series1 = row1[1]
series2 = row2[1]
return sum(series1[column] * series2[column] for column in columns)

def my_callback(_):
pbar.update()

pool = Pool(processes=4)
with tqdm(total=pairs_len) as pbar:
async_futures = [pool.apply_async(mkpair, args=(row1, row2), callback=my_callback)
for row1, row2 in pairs]
results = [async_future.get() for async_future in async_futures]
pool.close()
pool.join()
print(results)

Python Using List/Multiple Arguments in Pool Map

You should define your work function before declaring the Pool, when you declaring Pool, sub worker processes forked from that point, worker process don't execute code beyond that line, therefore not seeing your work function.

Besides, you'd better replace pool.map with pool.starmap to fit your input.

A simplified example:

from multiprocessing import Pool

def co_refresh(a, b, c, d):
print(a, b, c, d)

input_list = [f'a{i} b{i} c{i} d{i}'.split() for i in range(4)]
# [['a0', 'b0', 'c0', 'd0'], ['a1', 'b1', 'c1', 'd1'], ['a2', 'b2', 'c2', 'd2'], ['a3', 'b3', 'c3', 'd3']]

pool = Pool(processes=3)
pool.starmap(co_refresh, input_list)
pool.close()

How use multiprocessing.Pool.map with iterator and separat input for function

You could consider something like this (there are probably better ways):

from multiprocessing import Pool

def myfunc(a, b, c):
print(a, b, c)

df_A = 1
df_B = 2

def main():
with Pool() as pool:
pool.starmap(myfunc,[[df_A, df_B, x] for x in range(1, 6)])

if __name__ == '__main__':
main()

Output:

1 2 1
1 2 2
1 2 3
1 2 4
1 2 5


Related Topics



Leave a reply



Submit