Parallelize Apply After Pandas Groupby

Parallelize apply after pandas groupby

This seems to work, although it really should be built in to pandas

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
df['c'] = df.a + df.b
return df

def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)

if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)

print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)

print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)

Efficient way to parallelize/speed up pandas groupby transform

You can improve the execution time without using lambda function with .transform. Just use the DataFrameGroupBy.shift() function directly for the GroupBy object, as follows:

df["value_lag1"] = df.groupby(['ID'])["value"].shift(1)

The execution time of the original version vs this version in my machine are 36.6s vs 0.715s. Improved by 51x times faster.

With the lambda function in .transform, you are not using built-in vectorized Pandas operation and instead using slow non-optimized codes. By directly using the DataFrameGroupBy.shift() function, your codes become vectorized and run much faster.

Result Comparison

2 column names generated by original and new codes:

df["value_lag1"] = df.groupby(['ID'])["value"].transform(lambda x: x.shift(1))

df["value_lag2"] = df.groupby(['ID'])["value"].shift(1)

df["value_lag1"].compare(df["value_lag2"])

# No difference shown by the compare function:

self other
index

Performance Comparison

%%timeit
df["value_lag1"] = df.groupby(['ID'])["value"].transform(lambda x: x.shift(1))

36.6 s ± 768 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
df["value_lag1"] = df.groupby(['ID'])["value"].shift(1)

715 ms ± 64.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

36.6s vs 0.715s: Improved by 51x times faster

pandas group by in parallel

Not that I'm still looking for an answer, but It'd probably be better to use a library that handles parallel manipulations of pandas DataFrames, rather than trying to do so manually.

Dask is one option which is intended to scale Pandas operations with little code modification.

Another option (but is maybe a little more difficult to set up) is PySpark

Parallelize pandas apply

I think going down the route of trying stuff in parallel is probably over complicating this. I haven't tried this approach on a large sample so your mileage may vary, but it should give you an idea...

Let's just start with some dates...

import pandas as pd

dates = pd.to_datetime(['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03'])

We'll use some holiday data from pandas.tseries.holiday - note that in effect we want a DatetimeIndex...

from pandas.tseries.holiday import USFederalHolidayCalendar

holiday_calendar = USFederalHolidayCalendar()
holidays = holiday_calendar.holidays('2016-01-01')

This gives us:

DatetimeIndex(['2016-01-01', '2016-01-18', '2016-02-15', '2016-05-30',
'2016-07-04', '2016-09-05', '2016-10-10', '2016-11-11',
'2016-11-24', '2016-12-26',
...
'2030-01-01', '2030-01-21', '2030-02-18', '2030-05-27',
'2030-07-04', '2030-09-02', '2030-10-14', '2030-11-11',
'2030-11-28', '2030-12-25'],
dtype='datetime64[ns]', length=150, freq=None)

Now we find the indices of the nearest nearest holiday for the original dates using searchsorted:

indices = holidays.searchsorted(dates)
# array([1, 6, 9, 3])
next_nearest = holidays[indices]
# DatetimeIndex(['2016-01-18', '2016-10-10', '2016-12-26', '2016-05-30'], dtype='datetime64[ns]', freq=None)

Then take the difference between the two:

next_nearest_diff = pd.to_timedelta(next_nearest.values - dates.values).days
# array([15, 31, 14, 88])

You'll need to be careful about the indices so you don't wrap around, and for the previous date, do the calculation with the indices - 1 but it should act as (I hope) a relatively good base.

group-by/apply with Pandas and Multiprocessing

There are 2 main causes of issues in your code

  1. The use of the python's built-in sum function. this is a function that takes an iterable of number and return their sum.
    e.g. if you try to sum a slice of the dataframe df, you will get the same error traceback

sum(df.loc[1])

TypeError                                 Traceback (most recent call last)
<ipython-input-60-6dea0ab0880f> in <module>()
----> 1 sum(df.loc[1])
TypeError: unsupported operand type(s) for +: 'int' and 'str'

To fix this, you'll need to use pandas sum function as shown below

df.loc[1].sum()

#output
A 2
B 4
C 6
dtype: int64

As you can see, this will produce the expected result. i.e. sums the columns in the data-slice


  1. The second issue is the "reduce" stage. each process will return a single dataframe, the lines

    result = sorted(result, key=lambda x: x[0])

    return pd.concat([i[1] for i in result])

The first line will produce an error as whenever none of the result has a column called 0. similar issue with the second line. this can be resolved as follows

return pd.concat(result,axis=1)

Now the code will run without issue given the data being used.

The overall code:

import multiprocessing as mp
import pandas as pd
import numpy as np

def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)

def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
#result = sorted(result, key=lambda x: x[0])
return pd.concat(result,axis=1)

def my_func(x):
return x.sum()

if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)

Output:

             A   B   C
cluster_id
1 2 4 6
2 11 13 15


Related Topics



Leave a reply



Submit