Numpy Version of "Exponential Weighted Moving Average", Equivalent to Pandas.Ewm().Mean()

NumPy version of Exponential weighted moving average, equivalent to pandas.ewm().mean()

Updated 08/06/2019

PURE NUMPY, FAST & VECTORIZED SOLUTION FOR LARGE INPUTS

out parameter for in-place computation,
dtype parameter,
index order parameter

This function is equivalent to pandas' ewm(adjust=False).mean(), but much faster. ewm(adjust=True).mean() (the default for pandas) can produce different values at the start of the result. I am working to add the adjust functionality to this solution.

@Divakar's answer leads to floating point precision problems when the input is too large. This is because (1-alpha)**(n+1) -> 0 when n -> inf and alpha -> 1, leading to divide-by-zero's and NaN values popping up in the calculation.

Here is my fastest solution with no precision problems, nearly fully vectorized. It's gotten a little complicated but the performance is great, especially for really huge inputs. Without using in-place calculations (which is possible using the out parameter, saving memory allocation time): 3.62 seconds for 100M element input vector, 3.2ms for a 100K element input vector, and 293µs for a 5000 element input vector on a pretty old PC (results will vary with different alpha/row_size values).

# tested with python3 & numpy 1.15.2
import numpy as np

def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size. Default depends on dtype.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
data = np.array(data, copy=False)

if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float
else:
dtype = np.dtype(dtype)

row_size = int(row_size) if row_size is not None
else get_max_row_size(alpha, dtype)

if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)

if data.ndim > 1:
# flatten input
data = np.reshape(data, -1, order=order)

if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype

row_n = int(data.size // row_size) # the number of rows to use
trailing_n = int(data.size % row_size) # the amount of data leftover
first_offset = data[0]

if trailing_n > 0:
# set temporary results to slice view of out parameter
out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
else:
out_main_view = out
data_main_view = data

# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,
order='C', out=out_main_view)

scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
last_scaling_factor = scaling_factors[-1]

# create offset array
offsets = np.empty(out_main_view.shape[0], dtype=dtype)
offsets[0] = first_offset
# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]

# add the offsets to the result
out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]

if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],
dtype=dtype, order='C', out=out[-trailing_n:])
return out

def get_max_row_size(alpha, dtype=float):
assert 0. <= alpha < 1.
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy's optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
epsilon = np.finfo(dtype).tiny
# If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon)/np.log(1-alpha)) + 1

The 1D ewma function:

def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)

if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)

if data.ndim > 1:
# flatten input
data = data.reshape(-1, order)

if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype

if data.size < 1:
# empty input, return empty array
return out

if offset is None:
offset = data[0]

alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),
dtype=dtype)
# create cumulative sum array
np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],
dtype=dtype, out=out)
np.cumsum(out, dtype=dtype, out=out)

# cumsums / scaling
out /= scaling_factors[-2::-1]

if offset != 0:
offset = np.array(offset, copy=False).astype(dtype, copy=False)
# add offsets
out += offset * scaling_factors[1:]

return out

The 2D ewma function:

def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)

assert data.ndim <= 2

if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)

if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype

if data.size < 1:
# empty input, return empty array
return out

if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
offset = offset[0]
return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,
out=out)

assert -data.ndim <= axis < data.ndim

# create reshaped data views
out_view = out
if axis < 0:
axis = data.ndim - int(axis)

if axis == 0:
# transpose data views so columns are treated as rows
data = data.T
out_view = out_view.T

if offset is None:
# use the first element of each row as the offset
offset = np.copy(data[:, 0])
elif np.size(offset) == 1:
offset = np.reshape(offset, (1,))

alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

# calculate the moving average
row_size = data.shape[1]
row_n = data.shape[0]
scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
dtype=dtype)
# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),
dtype=dtype)
/ scaling_factors[np.newaxis, :-1],
dtype=dtype, out=out_view
)
np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
out_view /= scaling_factors[np.newaxis, -2::-1]

if not (np.size(offset) == 1 and offset == 0):
offset = offset.astype(dtype, copy=False)
# add the offsets to the scaled cumulative sums
out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]

return out

usage:

data_n = 100000000
data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100

span = 5000 # span >= 1
alpha = 2/(span+1) # for pandas` span parameter

# com = 1000 # com >= 0
# alpha = 1/(1+com) # for pandas` center-of-mass parameter

# halflife = 100 # halflife > 0
# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameter

result = ewma_vectorized_safe(data, alpha)

Just a tip

It is easy to calculate a 'window size' (technically exponential averages have infinite 'windows') for a given alpha, dependent on the contribution of the data in that window to the average. This is useful for example to chose how much of the start of the result to treat as unreliable due to border effects.

def window_size(alpha, sum_proportion):
# Increases with increased sum_proportion and decreased alpha
# solve (1-alpha)**window_size = (1-sum_proportion) for window_size
return int(np.log(1-sum_proportion) / np.log(1-alpha))

alpha = 0.02
sum_proportion = .99 # window covers 99% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 227
sum_proportion = .75 # window covers 75% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 68

The alpha = 2 / (window_size + 1.0) relation used in this thread (the 'span' option from pandas) is a very rough approximation of the inverse of the above function (with sum_proportion~=0.87). alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size) is more accurate (the 'half-life' option from pandas equals this formula with sum_proportion=0.5).

In the following example, data represents a continuous noisy signal. cutoff_idx is the first position in result where at least 99% of the value is dependent on separate values in data (i.e. less than 1% depends on data[0]). The data up to cutoff_idx is excluded from the final results because it is too dependent on the first value in data, therefore possibly skewing the average.

result = ewma_vectorized_safe(data, alpha, chunk_size)
sum_proportion = .99
cutoff_idx = window_size(alpha, sum_proportion)
result = result[cutoff_idx:]

To illustrate the problem the above solve you can run this a few times, notice the often-appearing false start of the red line, which is skipped after cutoff_idx:

data_n = 100000
data = np.random.rand(data_n) * 100
window = 1000
sum_proportion = .99
alpha = 1 - np.exp(np.log(1-sum_proportion)/window)

result = ewma_vectorized_safe(data, alpha)

cutoff_idx = window_size(alpha, sum_proportion)
x = np.arange(start=0, stop=result.size)

import matplotlib.pyplot as plt
plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',
x[cutoff_idx:], result[cutoff_idx:], '-b')
plt.show()

note that cutoff_idx==window because alpha was set with the inverse of the window_size() function, with the same sum_proportion.
This is similar to how pandas applies ewm(span=window, min_periods=window).

Pandas Exponential Moving Average (ewm) weight persists through the entire DataSeries?

Generally speaking, when calculating weighted averages, you need more than just the size of the windows because the calculations use data prior to that period.

In your case if you use 5x the size of window_size, you'll get the same last figure.

Full data set: 67108864 rows

values.ewm(span=window_size, min_periods=window_size).mean()

0 NaN
1 NaN
2 NaN
3 NaN
4 NaN
...
67108859 0.477958
67108860 0.479359
67108861 0.478963
67108862 0.479344
67108863 0.476700
Length: 67108864, dtype: float64

Reduced data set: 1280 rows (see the *5 in the slice)

values[nvalues - window_size*5:].ewm(span=window_size, min_periods=window_size).mean()
67107584 NaN
67107585 NaN
67107586 NaN
67107587 NaN
67107588 NaN
...
67108859 0.477958
67108860 0.479360
67108861 0.478963
67108862 0.479344
67108863 0.476700
Length: 1280, dtype: float64

Depending on the accuracy you are looking for, you can cut this back even further

Using latest panda APIs to compute exponential moving average

The ewm() method now has a similar API to moving() and expanding(): you call ewm() and then follow it with a compatible method like mean(). For example:

df=pd.DataFrame({'x':np.random.randn(5)})

df['x'].ewm(halflife=2).mean()

0 -0.442148
1 -0.318170
2 0.099168
3 -0.062827
4 -0.371739
Name: x, dtype: float64

If you try df['x'].ewm() without arguments it will tell you:

Must pass one of com, span, halflife, or alpha

See below for documentation that may be more clear than the link in the OP:

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.ewm.html#pandas.DataFrame.ewm

http://pandas.pydata.org/pandas-docs/stable/computation.html#exponentially-weighted-windows



Related Topics



Leave a reply



Submit