How to use multiprocessing pool.map with multiple arguments
The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap
method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with
, you'll also need to write a wrapper to turn Pool
into a context manager. (Thanks to muon for pointing this out.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
In simpler cases, with a fixed second argument, you can also use partial
, but only in Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.
Passing multiple parameters to pool.map() function in Python
You can use functools.partial
for this (as you suspected):
from functools import partial
def target(lock, iterable_item):
for item in iterable_item:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
Example:
def f(a, b, c):
print("{} {} {}".format(a, b, c))
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
a = "hi"
b = "there"
func = partial(f, a, b)
pool.map(func, iterable)
pool.close()
pool.join()
if __name__ == "__main__":
main()
Output:
hi there 1
hi there 2
hi there 3
hi there 4
hi there 5
How to use multiprocessing with multiple arguments in Python 3?
Assuming that urlset is an iterable, you should use
pool.starmap(processURL, zip(urlSet, repeat(user), repeat(verboseFlag)))
This is because you want to iterate over the urlset but have the same user and verboseFlag for each processURL instance(thus,repeat)
For reference you should take a look at Python multiprocessing pool.map for multiple arguments
The output of zip when iterated over, should look something like
[('www.google.com','user1',True),('www.goodle.uk','user1',True),]
for pool.starmap
to make sense of it.
How to use multiprocessing pool.starmap with multiple arguments
You have several issues with your code:
- When you iterate the iterator returned by calling method
pandas.iterrows()
you are passed a tuplet
of length 2 wheret[0]
is the row index andt[1]
is apandas.Series
instance. Your worker function,mkpair
, will be passed two of these tuples, one from each dataframe. as argumentsp1
andp2
. But you are callingp1.columns
wherep1
is a tuple and tuples have no such attribute ascolumns
. So this should have raised an altogether different exception (and apandas.Series
has no such method either). So I don't see how you are getting the exception you claim from the actual code you posted. Moreover, your statementpool = Pool(process=4)
is incorrect as the correct keyword argument is processes not process. So you could not possibly be executing this code (I will overlook the missing import statements, which I assume you actually have). - You are creating a progress bar that will be iterated upon submission of the tasks to the multiprocessing pool. So even if your work function took a second to complete (and was coded correctly), the progress bar would practically instantaneously shoot up to 100% as the tasks are submitted. So the only progress you are measuring is task submission, not completion.
What you need to do, assuming you want the progress bar to progress as tasks are completed, is to use a method such as imap_unordered
or apply_async
with a callback, which will allow you to update the bar as tasks complete. If you want to store the results in a list in task submission order rather than in completion order and you are using imap_unordered
, then you need to pass an index to the worker function that it then returns back with the result. The code is simpler if you use apply_async
but this method does not allow you to submit tasks in "chunks", which becomes an issue if the number of total tasks being submitted is very large (see the chunksize argument for method imap_unordered
). Here is how you would use each method:
Using imap_unordered
from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd
columns = ['x', 'y', 'z']
p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)
pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)
def mkpair(t: tuple) -> int:
from time import sleep
sleep(1)
# unpack tuple:
idx, rows = t
# unpack again:
row1, row2 = rows
series1 = row1[1]
series2 = row2[1]
return idx, sum(series1[column] * series2[column] for column in columns)
pool = Pool(processes=4)
results = [0] * pairs_len
with tqdm(total=pairs_len) as pbar:
for idx, total in pool.imap_unordered(mkpair, enumerate(pairs)):
results[idx] = total
pbar.update()
pool.close()
pool.join()
print(results)
Using apply_async
from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd
columns = ['x', 'y', 'z']
p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)
pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)
def mkpair(row1, row2) -> int:
from time import sleep
sleep(1)
series1 = row1[1]
series2 = row2[1]
return sum(series1[column] * series2[column] for column in columns)
def my_callback(_):
pbar.update()
pool = Pool(processes=4)
with tqdm(total=pairs_len) as pbar:
async_futures = [pool.apply_async(mkpair, args=(row1, row2), callback=my_callback)
for row1, row2 in pairs]
results = [async_future.get() for async_future in async_futures]
pool.close()
pool.join()
print(results)
Python Using List/Multiple Arguments in Pool Map
You should define your work function before declaring the Pool
, when you declaring Pool
, sub worker processes forked from that point, worker process don't execute code beyond that line, therefore not seeing your work function.
Besides, you'd better replace pool.map
with pool.starmap
to fit your input.
A simplified example:
from multiprocessing import Pool
def co_refresh(a, b, c, d):
print(a, b, c, d)
input_list = [f'a{i} b{i} c{i} d{i}'.split() for i in range(4)]
# [['a0', 'b0', 'c0', 'd0'], ['a1', 'b1', 'c1', 'd1'], ['a2', 'b2', 'c2', 'd2'], ['a3', 'b3', 'c3', 'd3']]
pool = Pool(processes=3)
pool.starmap(co_refresh, input_list)
pool.close()
How use multiprocessing.Pool.map with iterator and separat input for function
You could consider something like this (there are probably better ways):
from multiprocessing import Pool
def myfunc(a, b, c):
print(a, b, c)
df_A = 1
df_B = 2
def main():
with Pool() as pool:
pool.starmap(myfunc,[[df_A, df_B, x] for x in range(1, 6)])
if __name__ == '__main__':
main()
Output:
1 2 1
1 2 2
1 2 3
1 2 4
1 2 5
Related Topics
How to Pass a Variable by Reference
Using Global Variables in a Function
Why Does "A == X or Y or Z" Always Evaluate to True
How to Select Rows from a Dataframe Based on Column Values
How to Sort a Dictionary by Value
Why Is Using 'Eval' a Bad Practice
How to Iterate Over Rows in a Dataframe in Pandas
Pandas Conditional Creation of a Series/Dataframe Column
Faster Version of 'Pygame.Event.Get()'. Why Are Events Being Missed and Why Are the Events Delayed
How to Sort a List/Tuple of Lists/Tuples by the Element At a Given Index
Selenium Using Python - Geckodriver Executable Needs to Be in Path
Selenium in Python: Nosuchelementexception: Message: No Such Element: Unable to Locate Element