diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index 73adb12c171bf..1c53d4adc8c25 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -53,7 +53,7 @@ class NumbaEngineMethods: ["DataFrame", "Series"], ["int", "float"], [("rolling", {"window": 10}), ("expanding", {})], - ["sum", "max", "min", "median", "mean"], + ["sum", "max", "min", "median", "mean", "var", "std"], [True, False], [None, 100], ) diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index e87f5f53256cf..db5cce8459ca2 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -214,6 +214,7 @@ Other enhancements - :meth:`Timestamp.isoformat`, now handles the ``timespec`` argument from the base :class:``datetime`` class (:issue:`26131`) - :meth:`NaT.to_numpy` ``dtype`` argument is now respected, so ``np.timedelta64`` can be returned (:issue:`44460`) - New option ``display.max_dir_items`` customizes the number of columns added to :meth:`Dataframe.__dir__` and suggested for tab completion (:issue:`37996`) +- :meth:`.Rolling.var`, :meth:`.Expanding.var`, :meth:`.Rolling.std`, :meth:`.Expanding.std` now support `Numba `_ execution with the ``engine`` keyword (:issue:`44461`) .. --------------------------------------------------------------------------- diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index c2b6191c05152..acb0c6d175c51 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -51,10 +51,11 @@ def column_looper( start: np.ndarray, end: np.ndarray, min_periods: int, + *args, ): result = np.empty((len(start), values.shape[1]), dtype=np.float64) for i in numba.prange(values.shape[1]): - result[:, i] = func(values[:, i], start, end, min_periods) + result[:, i] = func(values[:, i], start, end, min_periods, *args) return result return column_looper diff --git a/pandas/core/_numba/kernels/__init__.py b/pandas/core/_numba/kernels/__init__.py index 23b0ec5c3d8aa..2753a1e01161d 100644 --- a/pandas/core/_numba/kernels/__init__.py +++ b/pandas/core/_numba/kernels/__init__.py @@ -1,4 +1,5 @@ from pandas.core._numba.kernels.mean_ import sliding_mean from pandas.core._numba.kernels.sum_ import sliding_sum +from pandas.core._numba.kernels.var_ import sliding_var -__all__ = ["sliding_mean", "sliding_sum"] +__all__ = ["sliding_mean", "sliding_sum", "sliding_var"] diff --git a/pandas/core/_numba/kernels/var_.py b/pandas/core/_numba/kernels/var_.py new file mode 100644 index 0000000000000..2e5660673701b --- /dev/null +++ b/pandas/core/_numba/kernels/var_.py @@ -0,0 +1,116 @@ +""" +Numba 1D var kernels that can be shared by +* Dataframe / Series +* groupby +* rolling / expanding + +Mirrors pandas/_libs/window/aggregation.pyx +""" +from __future__ import annotations + +import numba +import numpy as np + +from pandas.core._numba.kernels.shared import is_monotonic_increasing + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def add_var( + val: float, nobs: int, mean_x: float, ssqdm_x: float, compensation: float +) -> tuple[int, float, float, float]: + if not np.isnan(val): + nobs += 1 + prev_mean = mean_x - compensation + y = val - compensation + t = y - mean_x + compensation = t + mean_x - y + delta = t + if nobs: + mean_x += delta / nobs + else: + mean_x = 0 + ssqdm_x += (val - prev_mean) * (val - mean_x) + return nobs, mean_x, ssqdm_x, compensation + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def remove_var( + val: float, nobs: int, mean_x: float, ssqdm_x: float, compensation: float +) -> tuple[int, float, float, float]: + if not np.isnan(val): + nobs -= 1 + if nobs: + prev_mean = mean_x - compensation + y = val - compensation + t = y - mean_x + compensation = t + mean_x - y + delta = t + mean_x -= delta / nobs + ssqdm_x -= (val - prev_mean) * (val - mean_x) + else: + mean_x = 0 + ssqdm_x = 0 + return nobs, mean_x, ssqdm_x, compensation + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def sliding_var( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, + ddof: int = 1, +) -> np.ndarray: + N = len(start) + nobs = 0 + mean_x = 0.0 + ssqdm_x = 0.0 + compensation_add = 0.0 + compensation_remove = 0.0 + + min_periods = max(min_periods, 1) + is_monotonic_increasing_bounds = is_monotonic_increasing( + start + ) and is_monotonic_increasing(end) + + output = np.empty(N, dtype=np.float64) + + for i in range(N): + s = start[i] + e = end[i] + if i == 0 or not is_monotonic_increasing_bounds: + for j in range(s, e): + val = values[j] + nobs, mean_x, ssqdm_x, compensation_add = add_var( + val, nobs, mean_x, ssqdm_x, compensation_add + ) + else: + for j in range(start[i - 1], s): + val = values[j] + nobs, mean_x, ssqdm_x, compensation_remove = remove_var( + val, nobs, mean_x, ssqdm_x, compensation_remove + ) + + for j in range(end[i - 1], e): + val = values[j] + nobs, mean_x, ssqdm_x, compensation_add = add_var( + val, nobs, mean_x, ssqdm_x, compensation_add + ) + + if nobs >= min_periods and nobs > ddof: + if nobs == 1: + result = 0.0 + else: + result = ssqdm_x / (nobs - ddof) + else: + result = np.nan + + output[i] = result + + if not is_monotonic_increasing_bounds: + nobs = 0 + mean_x = 0.0 + ssqdm_x = 0.0 + compensation_remove = 0.0 + + return output diff --git a/pandas/core/window/doc.py b/pandas/core/window/doc.py index 61f388c35df0f..930c12841e4e4 100644 --- a/pandas/core/window/doc.py +++ b/pandas/core/window/doc.py @@ -98,14 +98,17 @@ def create_section_header(header: str) -> str: "extended documentation and performance considerations for the Numba engine.\n\n" ) -window_agg_numba_parameters = dedent( - """ + +def window_agg_numba_parameters(version: str = "1.3") -> str: + return ( + dedent( + """ engine : str, default None * ``'cython'`` : Runs the operation through C-extensions from cython. * ``'numba'`` : Runs the operation through JIT compiled code from numba. * ``None`` : Defaults to ``'cython'`` or globally setting ``compute.use_numba`` - .. versionadded:: 1.3.0 + .. versionadded:: {version}.0 engine_kwargs : dict, default None * For ``'cython'`` engine, there are no accepted ``engine_kwargs`` @@ -114,6 +117,9 @@ def create_section_header(header: str) -> str: ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` - .. versionadded:: 1.3.0\n + .. versionadded:: {version}.0\n """ -).replace("\n", "", 1) + ) + .replace("\n", "", 1) + .replace("{version}", version) + ) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index d91388e9722f7..4bebc56273805 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -511,7 +511,7 @@ def aggregate(self, func, *args, **kwargs): template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -565,7 +565,7 @@ def mean(self, *args, engine=None, engine_kwargs=None, **kwargs): template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 796849e622ff2..8c8b7a8284684 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -227,7 +227,7 @@ def apply( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -253,7 +253,7 @@ def sum( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -279,7 +279,7 @@ def max( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -305,7 +305,7 @@ def min( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -330,7 +330,7 @@ def mean( @doc( template_header, create_section_header("Parameters"), - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -361,6 +361,7 @@ def median( """ ).replace("\n", "", 1), args_compat, + window_agg_numba_parameters("1.4"), kwargs_compat, create_section_header("Returns"), template_returns, @@ -396,9 +397,18 @@ def median( aggregation_description="standard deviation", agg_method="std", ) - def std(self, ddof: int = 1, *args, **kwargs): + def std( + self, + ddof: int = 1, + *args, + engine: str | None = None, + engine_kwargs: dict[str, bool] | None = None, + **kwargs, + ): nv.validate_expanding_func("std", args, kwargs) - return super().std(ddof=ddof, **kwargs) + return super().std( + ddof=ddof, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) @doc( template_header, @@ -411,6 +421,7 @@ def std(self, ddof: int = 1, *args, **kwargs): """ ).replace("\n", "", 1), args_compat, + window_agg_numba_parameters("1.4"), kwargs_compat, create_section_header("Returns"), template_returns, @@ -446,9 +457,18 @@ def std(self, ddof: int = 1, *args, **kwargs): aggregation_description="variance", agg_method="var", ) - def var(self, ddof: int = 1, *args, **kwargs): + def var( + self, + ddof: int = 1, + *args, + engine: str | None = None, + engine_kwargs: dict[str, bool] | None = None, + **kwargs, + ): nv.validate_expanding_func("var", args, kwargs) - return super().var(ddof=ddof, **kwargs) + return super().var( + ddof=ddof, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) @doc( template_header, diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f9244462123bc..fc3390ee6db03 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -598,6 +598,7 @@ def _numba_apply( func: Callable[..., Any], numba_cache_key_str: str, engine_kwargs: dict[str, bool] | None = None, + *func_args, ): window_indexer = self._get_window_indexer() min_periods = ( @@ -621,7 +622,7 @@ def _numba_apply( aggregator = executor.generate_shared_aggregator( func, engine_kwargs, numba_cache_key_str ) - result = aggregator(values, start, end, min_periods) + result = aggregator(values, start, end, min_periods, *func_args) NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator result = result.T if self.axis == 1 else result if obj.ndim == 1: @@ -1459,8 +1460,24 @@ def median( window_func = window_aggregations.roll_median_c return self._apply(window_func, name="median", **kwargs) - def std(self, ddof: int = 1, *args, **kwargs): + def std( + self, + ddof: int = 1, + *args, + engine: str | None = None, + engine_kwargs: dict[str, bool] | None = None, + **kwargs, + ): nv.validate_window_func("std", args, kwargs) + if maybe_use_numba(engine): + if self.method == "table": + raise NotImplementedError("std not supported with method='table'") + else: + from pandas.core._numba.kernels import sliding_var + + return zsqrt( + self._numba_apply(sliding_var, "rolling_std", engine_kwargs, ddof) + ) window_func = window_aggregations.roll_var def zsqrt_func(values, begin, end, min_periods): @@ -1472,8 +1489,24 @@ def zsqrt_func(values, begin, end, min_periods): **kwargs, ) - def var(self, ddof: int = 1, *args, **kwargs): + def var( + self, + ddof: int = 1, + *args, + engine: str | None = None, + engine_kwargs: dict[str, bool] | None = None, + **kwargs, + ): nv.validate_window_func("var", args, kwargs) + if maybe_use_numba(engine): + if self.method == "table": + raise NotImplementedError("var not supported with method='table'") + else: + from pandas.core._numba.kernels import sliding_var + + return self._numba_apply( + sliding_var, "rolling_var", engine_kwargs, ddof + ) window_func = partial(window_aggregations.roll_var, ddof=ddof) return self._apply( window_func, @@ -1810,7 +1843,7 @@ def apply( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -1884,7 +1917,7 @@ def sum( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -1910,7 +1943,7 @@ def max( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -1951,7 +1984,7 @@ def min( template_header, create_section_header("Parameters"), args_compat, - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -1998,7 +2031,7 @@ def mean( @doc( template_header, create_section_header("Parameters"), - window_agg_numba_parameters, + window_agg_numba_parameters(), kwargs_compat, create_section_header("Returns"), template_returns, @@ -2044,6 +2077,7 @@ def median( """ ).replace("\n", "", 1), args_compat, + window_agg_numba_parameters("1.4"), kwargs_compat, create_section_header("Returns"), template_returns, @@ -2081,9 +2115,18 @@ def median( aggregation_description="standard deviation", agg_method="std", ) - def std(self, ddof: int = 1, *args, **kwargs): + def std( + self, + ddof: int = 1, + *args, + engine: str | None = None, + engine_kwargs: dict[str, bool] | None = None, + **kwargs, + ): nv.validate_rolling_func("std", args, kwargs) - return super().std(ddof=ddof, **kwargs) + return super().std( + ddof=ddof, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) @doc( template_header, @@ -2096,6 +2139,7 @@ def std(self, ddof: int = 1, *args, **kwargs): """ ).replace("\n", "", 1), args_compat, + window_agg_numba_parameters("1.4"), kwargs_compat, create_section_header("Returns"), template_returns, @@ -2133,9 +2177,18 @@ def std(self, ddof: int = 1, *args, **kwargs): aggregation_description="variance", agg_method="var", ) - def var(self, ddof: int = 1, *args, **kwargs): + def var( + self, + ddof: int = 1, + *args, + engine: str | None = None, + engine_kwargs: dict[str, bool] | None = None, + **kwargs, + ): nv.validate_rolling_func("var", args, kwargs) - return super().var(ddof=ddof, **kwargs) + return super().var( + ddof=ddof, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) @doc( template_header, diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 7b1aa93b5923a..bf1af0c83c93f 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -64,11 +64,15 @@ def arithmetic_win_operators(request): @pytest.fixture( params=[ - "sum", - "mean", - "median", - "max", - "min", + ["sum", {}], + ["mean", {}], + ["median", {}], + ["max", {}], + ["min", {}], + ["var", {}], + ["var", {"ddof": 0}], + ["std", {}], + ["std", {"ddof": 0}], ] ) def arithmetic_numba_supported_operators(request): diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index ad291344fd6ed..8cae9c0182724 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -50,16 +50,18 @@ def test_numba_vs_cython_rolling_methods( self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): - method = arithmetic_numba_supported_operators + method, kwargs = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} roll = data.rolling(2) - result = getattr(roll, method)(engine="numba", engine_kwargs=engine_kwargs) - expected = getattr(roll, method)(engine="cython") + result = getattr(roll, method)( + engine="numba", engine_kwargs=engine_kwargs, **kwargs + ) + expected = getattr(roll, method)(engine="cython", **kwargs) # Check the cache - if method not in ("mean", "sum"): + if method not in ("mean", "sum", "var", "std"): assert ( getattr(np, f"nan{method}"), "Rolling_apply_single", @@ -74,17 +76,19 @@ def test_numba_vs_cython_expanding_methods( self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): - method = arithmetic_numba_supported_operators + method, kwargs = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} data = DataFrame(np.eye(5)) expand = data.expanding() - result = getattr(expand, method)(engine="numba", engine_kwargs=engine_kwargs) - expected = getattr(expand, method)(engine="cython") + result = getattr(expand, method)( + engine="numba", engine_kwargs=engine_kwargs, **kwargs + ) + expected = getattr(expand, method)(engine="cython", **kwargs) # Check the cache - if method not in ("mean", "sum"): + if method not in ("mean", "sum", "var", "std"): assert ( getattr(np, f"nan{method}"), "Expanding_apply_single", @@ -282,19 +286,26 @@ def f(x): def test_table_method_rolling_methods( self, axis, nogil, parallel, nopython, arithmetic_numba_supported_operators ): - method = arithmetic_numba_supported_operators + method, kwargs = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} df = DataFrame(np.eye(3)) - - result = getattr( - df.rolling(2, method="table", axis=axis, min_periods=0), method - )(engine_kwargs=engine_kwargs, engine="numba") - expected = getattr( - df.rolling(2, method="single", axis=axis, min_periods=0), method - )(engine_kwargs=engine_kwargs, engine="numba") - tm.assert_frame_equal(result, expected) + roll_table = df.rolling(2, method="table", axis=axis, min_periods=0) + if method in ("var", "std"): + with pytest.raises(NotImplementedError, match=f"{method} not supported"): + getattr(roll_table, method)( + engine_kwargs=engine_kwargs, engine="numba", **kwargs + ) + else: + roll_single = df.rolling(2, method="single", axis=axis, min_periods=0) + result = getattr(roll_table, method)( + engine_kwargs=engine_kwargs, engine="numba", **kwargs + ) + expected = getattr(roll_single, method)( + engine_kwargs=engine_kwargs, engine="numba", **kwargs + ) + tm.assert_frame_equal(result, expected) def test_table_method_rolling_apply(self, axis, nogil, parallel, nopython): engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} @@ -349,19 +360,26 @@ def f(x): def test_table_method_expanding_methods( self, axis, nogil, parallel, nopython, arithmetic_numba_supported_operators ): - method = arithmetic_numba_supported_operators + method, kwargs = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} df = DataFrame(np.eye(3)) - - result = getattr(df.expanding(method="table", axis=axis), method)( - engine_kwargs=engine_kwargs, engine="numba" - ) - expected = getattr(df.expanding(method="single", axis=axis), method)( - engine_kwargs=engine_kwargs, engine="numba" - ) - tm.assert_frame_equal(result, expected) + expand_table = df.expanding(method="table", axis=axis) + if method in ("var", "std"): + with pytest.raises(NotImplementedError, match=f"{method} not supported"): + getattr(expand_table, method)( + engine_kwargs=engine_kwargs, engine="numba", **kwargs + ) + else: + expand_single = df.expanding(method="single", axis=axis) + result = getattr(expand_table, method)( + engine_kwargs=engine_kwargs, engine="numba", **kwargs + ) + expected = getattr(expand_single, method)( + engine_kwargs=engine_kwargs, engine="numba", **kwargs + ) + tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("data", [np.eye(3), np.ones((2, 3)), np.ones((3, 2))]) @pytest.mark.parametrize("method", ["mean", "sum"])