Parallelize apply after pandas groupby
This seems to work, although it really should be built in to pandas
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
Efficient way to parallelize/speed up pandas groupby transform
You can improve the execution time without using lambda function with .transform
. Just use the DataFrameGroupBy.shift()
function directly for the GroupBy object, as follows:
df["value_lag1"] = df.groupby(['ID'])["value"].shift(1)
The execution time of the original version vs this version in my machine are 36.6s vs 0.715s. Improved by 51x times faster.With the lambda function in .transform
, you are not using built-in vectorized Pandas operation and instead using slow non-optimized codes. By directly using the DataFrameGroupBy.shift()
function, your codes become vectorized and run much faster.
Result Comparison
2 column names generated by original and new codes:
df["value_lag1"] = df.groupby(['ID'])["value"].transform(lambda x: x.shift(1))
df["value_lag2"] = df.groupby(['ID'])["value"].shift(1)
df["value_lag1"].compare(df["value_lag2"])
# No difference shown by the compare function:
self other
index
Performance Comparison
%%timeit
df["value_lag1"] = df.groupby(['ID'])["value"].transform(lambda x: x.shift(1))
36.6 s ± 768 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
df["value_lag1"] = df.groupby(['ID'])["value"].shift(1)
715 ms ± 64.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
36.6s vs 0.715s: Improved by 51x times faster pandas group by in parallel
Not that I'm still looking for an answer, but It'd probably be better to use a library that handles parallel manipulations of pandas DataFrames, rather than trying to do so manually.
Dask is one option which is intended to scale Pandas operations with little code modification.
Another option (but is maybe a little more difficult to set up) is PySpark
Parallelize pandas apply
I think going down the route of trying stuff in parallel is probably over complicating this. I haven't tried this approach on a large sample so your mileage may vary, but it should give you an idea...
Let's just start with some dates...
import pandas as pd
dates = pd.to_datetime(['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03'])
We'll use some holiday data from pandas.tseries.holiday
- note that in effect we want a DatetimeIndex
...from pandas.tseries.holiday import USFederalHolidayCalendar
holiday_calendar = USFederalHolidayCalendar()
holidays = holiday_calendar.holidays('2016-01-01')
This gives us:DatetimeIndex(['2016-01-01', '2016-01-18', '2016-02-15', '2016-05-30',
'2016-07-04', '2016-09-05', '2016-10-10', '2016-11-11',
'2016-11-24', '2016-12-26',
...
'2030-01-01', '2030-01-21', '2030-02-18', '2030-05-27',
'2030-07-04', '2030-09-02', '2030-10-14', '2030-11-11',
'2030-11-28', '2030-12-25'],
dtype='datetime64[ns]', length=150, freq=None)
Now we find the indices of the nearest nearest holiday for the original dates using searchsorted
:indices = holidays.searchsorted(dates)
# array([1, 6, 9, 3])
next_nearest = holidays[indices]
# DatetimeIndex(['2016-01-18', '2016-10-10', '2016-12-26', '2016-05-30'], dtype='datetime64[ns]', freq=None)
Then take the difference between the two:next_nearest_diff = pd.to_timedelta(next_nearest.values - dates.values).days
# array([15, 31, 14, 88])
You'll need to be careful about the indices so you don't wrap around, and for the previous date, do the calculation with the indices - 1
but it should act as (I hope) a relatively good base. group-by/apply with Pandas and Multiprocessing
There are 2 main causes of issues in your code
- The use of the python's built-in sum function. this is a function that takes an iterable of number and return their sum.
e.g. if you try to sum a slice of the dataframe df, you will get the same error traceback
To fix this, you'll need to use pandassum(df.loc[1])
TypeError Traceback (most recent call last)
<ipython-input-60-6dea0ab0880f> in <module>()
----> 1 sum(df.loc[1])
TypeError: unsupported operand type(s) for +: 'int' and 'str'
sum
function as shown belowdf.loc[1].sum()
#output
A 2
B 4
C 6
dtype: int64
As you can see, this will produce the expected result. i.e. sums the columns in the data-sliceThe second issue is the "reduce" stage. each process will return a single dataframe, the lines
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
return pd.concat(result,axis=1)
Now the code will run without issue given the data being used.The overall code:
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
#result = sorted(result, key=lambda x: x[0])
return pd.concat(result,axis=1)
def my_func(x):
return x.sum()
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
Output: A B C
cluster_id
1 2 4 6
2 11 13 15
Related Topics
Listing Contents of a Bucket with Boto3
How to Print a Dictionary Line by Line in Python
Using Only the Db Part of Django
Pandas Latitude-Longitude to Distance Between Successive Rows
How to Format a Date in Jinja2
When Should an Attribute Be Private and Made a Read-Only Property
How to Check Task Status in Celery
Start a Flask Application in Separate Thread
What's the Difference Between Subprocess Popen and Call (How to Use Them)
Django - Makemigrations - No Changes Detected
Possible Values from Sys.Platform
How to Save and Restore Multiple Variables in Python
What Is the Official "Preferred" Way to Install Pip and Virtualenv Systemwide