diff --git a/doc/source/reference/window.rst b/doc/source/reference/window.rst index 5e230a533625f..0be3184a9356c 100644 --- a/doc/source/reference/window.rst +++ b/doc/source/reference/window.rst @@ -88,6 +88,7 @@ Exponentially-weighted window functions :toctree: api/ ExponentialMovingWindow.mean + ExponentialMovingWindow.sum ExponentialMovingWindow.std ExponentialMovingWindow.var ExponentialMovingWindow.corr diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 2a3049895a390..daf0d0d000079 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -128,7 +128,7 @@ Other enhancements - Added support for nullable boolean and integer types in :meth:`DataFrame.to_stata`, :class:`~pandas.io.stata.StataWriter`, :class:`~pandas.io.stata.StataWriter117`, and :class:`~pandas.io.stata.StataWriterUTF8` (:issue:`40855`) - :meth:`DataFrame.__pos__`, :meth:`DataFrame.__neg__` now retain ``ExtensionDtype`` dtypes (:issue:`43883`) - The error raised when an optional dependency can't be imported now includes the original exception, for easier investigation (:issue:`43882`) -- +- Added :meth:`.ExponentialMovingWindow.sum` (:issue:`13297`) .. --------------------------------------------------------------------------- diff --git a/pandas/_libs/window/aggregations.pyi b/pandas/_libs/window/aggregations.pyi index 879809a259266..f3317ff5a60be 100644 --- a/pandas/_libs/window/aggregations.pyi +++ b/pandas/_libs/window/aggregations.pyi @@ -100,7 +100,7 @@ def roll_weighted_var( minp: int, # int64_t ddof: int, # unsigned int ) -> np.ndarray: ... # np.ndarray[np.float64] -def ewma( +def ewm( vals: np.ndarray, # const float64_t[:] start: np.ndarray, # const int64_t[:] end: np.ndarray, # const int64_t[:] @@ -109,6 +109,7 @@ def ewma( adjust: bool, ignore_na: bool, deltas: np.ndarray, # const float64_t[:] + normalize: bool, ) -> np.ndarray: ... # np.ndarray[np.float64] def ewmcov( input_x: np.ndarray, # const float64_t[:] diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 29fe20090875b..1941a3c4a37f0 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1604,13 +1604,13 @@ def roll_weighted_var(const float64_t[:] values, const float64_t[:] weights, # ---------------------------------------------------------------------- -# Exponentially weighted moving average +# Exponentially weighted moving -def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, - int minp, float64_t com, bint adjust, bint ignore_na, - const float64_t[:] deltas=None) -> np.ndarray: +def ewm(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, + int minp, float64_t com, bint adjust, bint ignore_na, + const float64_t[:] deltas=None, bint normalize=True) -> np.ndarray: """ - Compute exponentially-weighted moving average using center-of-mass. + Compute exponentially-weighted moving average or sum using center-of-mass. Parameters ---------- @@ -1623,6 +1623,8 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, ignore_na : bool deltas : ndarray (float64 type), optional. If None, implicitly assumes equally spaced points (used when `times` is not passed) + normalize : bool, optional. + If True, calculate the mean. If False, calculate the sum. Returns ------- @@ -1634,7 +1636,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, const float64_t[:] sub_vals const float64_t[:] sub_deltas=None ndarray[float64_t] sub_output, output = np.empty(N, dtype=np.float64) - float64_t alpha, old_wt_factor, new_wt, weighted_avg, old_wt, cur + float64_t alpha, old_wt_factor, new_wt, weighted, old_wt, cur bint is_observation, use_deltas if N == 0: @@ -1657,10 +1659,10 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, win_size = len(sub_vals) sub_output = np.empty(win_size, dtype=np.float64) - weighted_avg = sub_vals[0] - is_observation = weighted_avg == weighted_avg + weighted = sub_vals[0] + is_observation = weighted == weighted nobs = int(is_observation) - sub_output[0] = weighted_avg if nobs >= minp else NaN + sub_output[0] = weighted if nobs >= minp else NaN old_wt = 1. with nogil: @@ -1668,37 +1670,38 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, cur = sub_vals[i] is_observation = cur == cur nobs += is_observation - if weighted_avg == weighted_avg: + if weighted == weighted: if is_observation or not ignore_na: - if use_deltas: - old_wt *= old_wt_factor ** sub_deltas[i - 1] + if normalize: + if use_deltas: + old_wt *= old_wt_factor ** sub_deltas[i - 1] + else: + old_wt *= old_wt_factor else: - old_wt *= old_wt_factor + weighted = old_wt_factor * weighted if is_observation: - - # avoid numerical errors on constant series - if weighted_avg != cur: - weighted_avg = ((old_wt * weighted_avg) + - (new_wt * cur)) / (old_wt + new_wt) - if adjust: - old_wt += new_wt + if normalize: + # avoid numerical errors on constant series + if weighted != cur: + weighted = old_wt * weighted + new_wt * cur + weighted /= (old_wt + new_wt) + if adjust: + old_wt += new_wt + else: + old_wt = 1. else: - old_wt = 1. + weighted += cur elif is_observation: - weighted_avg = cur + weighted = cur - sub_output[i] = weighted_avg if nobs >= minp else NaN + sub_output[i] = weighted if nobs >= minp else NaN output[s:e] = sub_output return output -# ---------------------------------------------------------------------- -# Exponentially weighted moving covariance - - def ewmcov(const float64_t[:] input_x, const int64_t[:] start, const int64_t[:] end, int minp, const float64_t[:] input_y, float64_t com, bint adjust, bint ignore_na, bint bias) -> np.ndarray: diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 29a6704ae5092..d769f846b3bdc 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -46,8 +46,8 @@ window_agg_numba_parameters, ) from pandas.core.window.numba_ import ( - generate_ewma_numba_table_func, - generate_numba_ewma_func, + generate_numba_ewm_func, + generate_numba_ewm_table_func, ) from pandas.core.window.online import ( EWMMeanState, @@ -469,17 +469,21 @@ def aggregate(self, func, *args, **kwargs): def mean(self, *args, engine=None, engine_kwargs=None, **kwargs): if maybe_use_numba(engine): if self.method == "single": - ewma_func = generate_numba_ewma_func( - engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas - ) - numba_cache_key = (lambda x: x, "ewma") + func = generate_numba_ewm_func + numba_cache_key = (lambda x: x, "ewm_mean") else: - ewma_func = generate_ewma_numba_table_func( - engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas - ) - numba_cache_key = (lambda x: x, "ewma_table") + func = generate_numba_ewm_table_func + numba_cache_key = (lambda x: x, "ewm_mean_table") + ewm_func = func( + engine_kwargs=engine_kwargs, + com=self._com, + adjust=self.adjust, + ignore_na=self.ignore_na, + deltas=self._deltas, + normalize=True, + ) return self._apply( - ewma_func, + ewm_func, numba_cache_key=numba_cache_key, ) elif engine in ("cython", None): @@ -489,11 +493,68 @@ def mean(self, *args, engine=None, engine_kwargs=None, **kwargs): deltas = None if self.times is None else self._deltas window_func = partial( - window_aggregations.ewma, + window_aggregations.ewm, + com=self._com, + adjust=self.adjust, + ignore_na=self.ignore_na, + deltas=deltas, + normalize=True, + ) + return self._apply(window_func) + else: + raise ValueError("engine must be either 'numba' or 'cython'") + + @doc( + template_header, + create_section_header("Parameters"), + args_compat, + window_agg_numba_parameters, + kwargs_compat, + create_section_header("Returns"), + template_returns, + create_section_header("See Also"), + template_see_also, + create_section_header("Notes"), + numba_notes.replace("\n", "", 1), + window_method="ewm", + aggregation_description="(exponential weighted moment) sum", + agg_method="sum", + ) + def sum(self, *args, engine=None, engine_kwargs=None, **kwargs): + if not self.adjust: + raise NotImplementedError("sum is not implemented with adjust=False") + if maybe_use_numba(engine): + if self.method == "single": + func = generate_numba_ewm_func + numba_cache_key = (lambda x: x, "ewm_sum") + else: + func = generate_numba_ewm_table_func + numba_cache_key = (lambda x: x, "ewm_sum_table") + ewm_func = func( + engine_kwargs=engine_kwargs, + com=self._com, + adjust=self.adjust, + ignore_na=self.ignore_na, + deltas=self._deltas, + normalize=False, + ) + return self._apply( + ewm_func, + numba_cache_key=numba_cache_key, + ) + elif engine in ("cython", None): + if engine_kwargs is not None: + raise ValueError("cython engine does not accept engine_kwargs") + nv.validate_window_func("sum", args, kwargs) + + deltas = None if self.times is None else self._deltas + window_func = partial( + window_aggregations.ewm, com=self._com, adjust=self.adjust, ignore_na=self.ignore_na, deltas=deltas, + normalize=False, ) return self._apply(window_func) else: diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index ab1eb9d3a2688..f41711a4d1f19 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -80,15 +80,16 @@ def roll_apply( return roll_apply -def generate_numba_ewma_func( +def generate_numba_ewm_func( engine_kwargs: dict[str, bool] | None, com: float, adjust: bool, ignore_na: bool, deltas: np.ndarray, + normalize: bool, ): """ - Generate a numba jitted ewma function specified by values + Generate a numba jitted ewm mean or sum function specified by values from engine_kwargs. Parameters @@ -99,6 +100,7 @@ def generate_numba_ewma_func( adjust : bool ignore_na : bool deltas : numpy.ndarray + normalize : bool Returns ------- @@ -106,14 +108,15 @@ def generate_numba_ewma_func( """ nopython, nogil, parallel = get_jit_arguments(engine_kwargs) - cache_key = (lambda x: x, "ewma") + str_key = "ewm_mean" if normalize else "ewm_sum" + cache_key = (lambda x: x, str_key) if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] numba = import_optional_dependency("numba") @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) - def ewma( + def ewm( values: np.ndarray, begin: np.ndarray, end: np.ndarray, @@ -130,43 +133,47 @@ def ewma( window = values[start:stop] sub_result = np.empty(len(window)) - weighted_avg = window[0] - nobs = int(not np.isnan(weighted_avg)) - sub_result[0] = weighted_avg if nobs >= minimum_periods else np.nan + weighted = window[0] + nobs = int(not np.isnan(weighted)) + sub_result[0] = weighted if nobs >= minimum_periods else np.nan old_wt = 1.0 for j in range(1, len(window)): cur = window[j] is_observation = not np.isnan(cur) nobs += is_observation - if not np.isnan(weighted_avg): + if not np.isnan(weighted): if is_observation or not ignore_na: - - # note that len(deltas) = len(vals) - 1 and deltas[i] is to be - # used in conjunction with vals[i+1] - old_wt *= old_wt_factor ** deltas[start + j - 1] + if normalize: + # note that len(deltas) = len(vals) - 1 and deltas[i] + # is to be used in conjunction with vals[i+1] + old_wt *= old_wt_factor ** deltas[start + j - 1] + else: + weighted = old_wt_factor * weighted if is_observation: - - # avoid numerical errors on constant series - if weighted_avg != cur: - weighted_avg = ( - (old_wt * weighted_avg) + (new_wt * cur) - ) / (old_wt + new_wt) - if adjust: - old_wt += new_wt + if normalize: + # avoid numerical errors on constant series + if weighted != cur: + weighted = old_wt * weighted + new_wt * cur + if normalize: + weighted = weighted / (old_wt + new_wt) + if adjust: + old_wt += new_wt + else: + old_wt = 1.0 else: - old_wt = 1.0 + weighted += cur elif is_observation: - weighted_avg = cur + weighted = cur - sub_result[j] = weighted_avg if nobs >= minimum_periods else np.nan + sub_result[j] = weighted if nobs >= minimum_periods else np.nan result[start:stop] = sub_result return result - return ewma + return ewm def generate_numba_table_func( @@ -252,15 +259,16 @@ def nan_agg_with_axis(table): return nan_agg_with_axis -def generate_ewma_numba_table_func( +def generate_numba_ewm_table_func( engine_kwargs: dict[str, bool] | None, com: float, adjust: bool, ignore_na: bool, deltas: np.ndarray, + normalize: bool, ): """ - Generate a numba jitted ewma function applied table wise specified + Generate a numba jitted ewm mean or sum function applied table wise specified by values from engine_kwargs. Parameters @@ -271,6 +279,7 @@ def generate_ewma_numba_table_func( adjust : bool ignore_na : bool deltas : numpy.ndarray + normalize: bool Returns ------- @@ -278,14 +287,15 @@ def generate_ewma_numba_table_func( """ nopython, nogil, parallel = get_jit_arguments(engine_kwargs) - cache_key = (lambda x: x, "ewma_table") + str_key = "ewm_mean_table" if normalize else "ewm_sum_table" + cache_key = (lambda x: x, str_key) if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] numba = import_optional_dependency("numba") @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) - def ewma_table( + def ewm_table( values: np.ndarray, begin: np.ndarray, end: np.ndarray, @@ -297,35 +307,42 @@ def ewma_table( old_wt = np.ones(values.shape[1]) result = np.empty(values.shape) - weighted_avg = values[0].copy() - nobs = (~np.isnan(weighted_avg)).astype(np.int64) - result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + weighted = values[0].copy() + nobs = (~np.isnan(weighted)).astype(np.int64) + result[0] = np.where(nobs >= minimum_periods, weighted, np.nan) for i in range(1, len(values)): cur = values[i] is_observations = ~np.isnan(cur) nobs += is_observations.astype(np.int64) for j in numba.prange(len(cur)): - if not np.isnan(weighted_avg[j]): + if not np.isnan(weighted[j]): if is_observations[j] or not ignore_na: - - # note that len(deltas) = len(vals) - 1 and deltas[i] is to be - # used in conjunction with vals[i+1] - old_wt[j] *= old_wt_factor ** deltas[i - 1] + if normalize: + # note that len(deltas) = len(vals) - 1 and deltas[i] + # is to be used in conjunction with vals[i+1] + old_wt[j] *= old_wt_factor ** deltas[i - 1] + else: + weighted[j] = old_wt_factor * weighted[j] if is_observations[j]: - # avoid numerical errors on constant series - if weighted_avg[j] != cur[j]: - weighted_avg[j] = ( - (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j]) - ) / (old_wt[j] + new_wt) - if adjust: - old_wt[j] += new_wt + if normalize: + # avoid numerical errors on constant series + if weighted[j] != cur[j]: + weighted[j] = ( + old_wt[j] * weighted[j] + new_wt * cur[j] + ) + if normalize: + weighted[j] = weighted[j] / (old_wt[j] + new_wt) + if adjust: + old_wt[j] += new_wt + else: + old_wt[j] = 1.0 else: - old_wt[j] = 1.0 + weighted[j] += cur[j] elif is_observations[j]: - weighted_avg[j] = cur[j] + weighted[j] = cur[j] - result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + result[i] = np.where(nobs >= minimum_periods, weighted, np.nan) return result - return ewma_table + return ewm_table diff --git a/pandas/tests/window/test_ewm.py b/pandas/tests/window/test_ewm.py index 5579444f99bbb..4cb5d0342572b 100644 --- a/pandas/tests/window/test_ewm.py +++ b/pandas/tests/window/test_ewm.py @@ -241,3 +241,22 @@ def test_times_string_col_deprecated(): result = df.ewm(halflife="1 day", min_periods=0, times="time_col").mean() expected = df.ewm(halflife=1.0, min_periods=0).mean() tm.assert_frame_equal(result, expected) + + +def test_ewm_sum_adjust_false_notimplemented(): + data = Series(range(1)).ewm(com=1, adjust=False) + with pytest.raises(NotImplementedError, match="sum is not"): + data.sum() + + +@pytest.mark.parametrize( + "expected_data, ignore", + [[[10.0, 5.0, 2.5, 11.25], False], [[10.0, 5.0, 5.0, 12.5], True]], +) +def test_ewm_sum(expected_data, ignore): + # xref from Numbagg tests + # https://github.com/numbagg/numbagg/blob/v0.2.1/numbagg/test/test_moving.py#L50 + data = Series([10, 0, np.nan, 10]) + result = data.ewm(alpha=0.5, ignore_na=ignore).sum() + expected = Series(expected_data) + tm.assert_series_equal(result, expected) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index af2ca7270c982..d47b3e856cb25 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -159,28 +159,31 @@ def add(values, x): @td.skip_if_no("numba") -class TestEWMMean: +class TestEWM: @pytest.mark.parametrize( "grouper", [lambda x: x, lambda x: x.groupby("A")], ids=["None", "groupby"] ) - def test_invalid_engine(self, grouper): + @pytest.mark.parametrize("method", ["mean", "sum"]) + def test_invalid_engine(self, grouper, method): df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) with pytest.raises(ValueError, match="engine must be either"): - grouper(df).ewm(com=1.0).mean(engine="foo") + getattr(grouper(df).ewm(com=1.0), method)(engine="foo") @pytest.mark.parametrize( "grouper", [lambda x: x, lambda x: x.groupby("A")], ids=["None", "groupby"] ) - def test_invalid_engine_kwargs(self, grouper): + @pytest.mark.parametrize("method", ["mean", "sum"]) + def test_invalid_engine_kwargs(self, grouper, method): df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) with pytest.raises(ValueError, match="cython engine does not"): - grouper(df).ewm(com=1.0).mean( + getattr(grouper(df).ewm(com=1.0), method)( engine="cython", engine_kwargs={"nopython": True} ) @pytest.mark.parametrize("grouper", ["None", "groupby"]) + @pytest.mark.parametrize("method", ["mean", "sum"]) def test_cython_vs_numba( - self, grouper, nogil, parallel, nopython, ignore_na, adjust + self, grouper, method, nogil, parallel, nopython, ignore_na, adjust ): if grouper == "None": grouper = lambda x: x @@ -188,15 +191,16 @@ def test_cython_vs_numba( else: grouper = lambda x: x.groupby("A") warn = None - + if method == "sum": + adjust = True df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) ewm = grouper(df).ewm(com=1.0, adjust=adjust, ignore_na=ignore_na) engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} with tm.assert_produces_warning(warn, match="nuisance"): # GH#42738 - result = ewm.mean(engine="numba", engine_kwargs=engine_kwargs) - expected = ewm.mean(engine="cython") + result = getattr(ewm, method)(engine="numba", engine_kwargs=engine_kwargs) + expected = getattr(ewm, method)(engine="cython") tm.assert_frame_equal(result, expected) @@ -358,15 +362,16 @@ def test_table_method_expanding_methods( tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("data", [np.eye(3), np.ones((2, 3)), np.ones((3, 2))]) - def test_table_method_ewm(self, data, axis, nogil, parallel, nopython): + @pytest.mark.parametrize("method", ["mean", "sum"]) + def test_table_method_ewm(self, data, method, axis, nogil, parallel, nopython): engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} df = DataFrame(data) - result = df.ewm(com=1, method="table", axis=axis).mean( + result = getattr(df.ewm(com=1, method="table", axis=axis), method)( engine_kwargs=engine_kwargs, engine="numba" ) - expected = df.ewm(com=1, method="single", axis=axis).mean( + expected = getattr(df.ewm(com=1, method="single", axis=axis), method)( engine_kwargs=engine_kwargs, engine="numba" ) tm.assert_frame_equal(result, expected)