diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 3134df0daf..29d61f3075 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -15,7 +15,7 @@ from __future__ import annotations import typing -from typing import Sequence, Tuple, Union +from typing import Literal, Sequence, Tuple, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby @@ -23,10 +23,10 @@ import pandas as pd from bigframes import session +from bigframes.core import expression as ex from bigframes.core import log_adapter import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks -import bigframes.core.expression import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.core.validations as validations @@ -305,13 +305,16 @@ def diff(self, periods=1) -> series.Series: return self._apply_window_op(agg_ops.DiffOp(periods), window=window) @validations.requires_ordering() - def rolling(self, window: int, min_periods=None) -> windows.Window: - # To get n size window, need current row and n-1 preceding rows. - window_spec = window_specs.rows( - grouping_keys=tuple(self._by_col_ids), - start=-(window - 1), - end=0, - min_periods=min_periods or window, + def rolling( + self, + window: int, + min_periods=None, + closed: Literal["right", "left", "both", "neither"] = "right", + ) -> windows.Window: + window_spec = window_specs.WindowSpec( + bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, + grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), ) block = self._block.order_by( [order.ascending_over(col) for col in self._by_col_ids], @@ -361,7 +364,7 @@ def _agg_string(self, func: str) -> df.DataFrame: return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: - aggregations: typing.List[bigframes.core.expression.Aggregation] = [] + aggregations: typing.List[ex.Aggregation] = [] column_labels = [] want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values()) @@ -738,13 +741,16 @@ def diff(self, periods=1) -> series.Series: return self._apply_window_op(agg_ops.DiffOp(periods), window=window) @validations.requires_ordering() - def rolling(self, window: int, min_periods=None) -> windows.Window: - # To get n size window, need current row and n-1 preceding rows. - window_spec = window_specs.rows( - grouping_keys=tuple(self._by_col_ids), - start=-(window - 1), - end=0, - min_periods=min_periods or window, + def rolling( + self, + window: int, + min_periods=None, + closed: Literal["right", "left", "both", "neither"] = "right", + ) -> windows.Window: + window_spec = window_specs.WindowSpec( + bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, + grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), ) block = self._block.order_by( [order.ascending_over(col) for col in self._by_col_ids], @@ -806,11 +812,9 @@ def _apply_window_op( return series.Series(block.select_column(result_id)) -def agg(input: str, op: agg_ops.AggregateOp) -> bigframes.core.expression.Aggregation: +def agg(input: str, op: agg_ops.AggregateOp) -> ex.Aggregation: if isinstance(op, agg_ops.UnaryAggregateOp): - return bigframes.core.expression.UnaryAggregation( - op, bigframes.core.expression.deref(input) - ) + return ex.UnaryAggregation(op, ex.deref(input)) else: assert isinstance(op, agg_ops.NullaryAggregateOp) - return bigframes.core.expression.NullaryAggregation(op) + return ex.NullaryAggregation(op) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index a286234fc8..142e3a7e00 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -15,7 +15,7 @@ from dataclasses import dataclass, replace import itertools -from typing import Mapping, Optional, Set, Tuple, Union +from typing import Literal, Mapping, Optional, Set, Tuple, Union import bigframes.core.expression as ex import bigframes.core.identifiers as ids @@ -140,6 +140,21 @@ class RowsWindowBounds: start: Optional[int] = None end: Optional[int] = None + @classmethod + def from_window_size( + cls, window: int, closed: Literal["right", "left", "both", "neither"] + ) -> RowsWindowBounds: + if closed == "right": + return cls(-(window - 1), 0) + elif closed == "left": + return cls(-window, -1) + elif closed == "both": + return cls(-window, 0) + elif closed == "neither": + return cls(-(window - 1), -1) + else: + raise ValueError(f"Unsupported value for 'closed' parameter: {closed}") + def __post_init__(self): if self.start is None: return diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index cc74b72716..7f9e62b7dd 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3308,10 +3308,15 @@ def _perform_join_by_index( return DataFrame(block) @validations.requires_ordering() - def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: - # To get n size window, need current row and n-1 preceding rows. - window_def = windows.rows( - start=-(window - 1), end=0, min_periods=min_periods or window + def rolling( + self, + window: int, + min_periods=None, + closed: Literal["right", "left", "both", "neither"] = "right", + ) -> bigframes.core.window.Window: + window_def = windows.WindowSpec( + bounds=windows.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, ) return bigframes.core.window.Window( self._block, window_def, self._block.value_columns diff --git a/bigframes/series.py b/bigframes/series.py index a33a3fca5c..be87129929 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -35,11 +35,10 @@ import typing_extensions import bigframes.core -from bigframes.core import log_adapter +from bigframes.core import groupby, log_adapter import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.expression as ex -import bigframes.core.groupby as groupby import bigframes.core.indexers import bigframes.core.indexes as indexes import bigframes.core.ordering as order @@ -1438,10 +1437,15 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: return Series(block) @validations.requires_ordering() - def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: - # To get n size window, need current row and n-1 preceding rows. - window_spec = windows.rows( - start=-(window - 1), end=0, min_periods=min_periods or window + def rolling( + self, + window: int, + min_periods=None, + closed: Literal["right", "left", "both", "neither"] = "right", + ) -> bigframes.core.window.Window: + window_spec = windows.WindowSpec( + bounds=windows.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, ) return bigframes.core.window.Window( self._block, window_spec, self._block.value_columns, is_series=True diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index 2b9ec1a3c0..68613f1372 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -16,6 +16,111 @@ import pytest +@pytest.fixture(scope="module") +def rolling_dfs(scalars_dfs): + bf_df, pd_df = scalars_dfs + + target_cols = ["int64_too", "float64_col", "bool_col"] + + bf_df = bf_df[target_cols].set_index("bool_col") + pd_df = pd_df[target_cols].set_index("bool_col") + + return bf_df, pd_df + + +@pytest.fixture(scope="module") +def rolling_series(scalars_dfs): + bf_df, pd_df = scalars_dfs + target_col = "int64_too" + + return bf_df[target_col], pd_df[target_col] + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +def test_dataframe_rolling_closed_param(rolling_dfs, closed): + bf_df, pd_df = rolling_dfs + + actual_result = bf_df.rolling(window=3, closed=closed).sum().to_pandas() + + expected_result = pd_df.rolling(window=3, closed=closed).sum() + pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): + bf_df, pd_df = rolling_dfs + + actual_result = ( + bf_df.groupby(level=0).rolling(window=3, closed=closed).sum().to_pandas() + ) + + expected_result = pd_df.groupby(level=0).rolling(window=3, closed=closed).sum() + pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) + + +def test_dataframe_rolling_default_closed_param(rolling_dfs): + bf_df, pd_df = rolling_dfs + + actual_result = bf_df.rolling(window=3).sum().to_pandas() + + expected_result = pd_df.rolling(window=3).sum() + pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) + + +def test_dataframe_groupby_rolling_default_closed_param(rolling_dfs): + bf_df, pd_df = rolling_dfs + + actual_result = bf_df.groupby(level=0).rolling(window=3).sum().to_pandas() + + expected_result = pd_df.groupby(level=0).rolling(window=3).sum() + pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +def test_series_rolling_closed_param(rolling_series, closed): + bf_series, df_series = rolling_series + + actual_result = bf_series.rolling(window=3, closed=closed).sum().to_pandas() + + expected_result = df_series.rolling(window=3, closed=closed).sum() + pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +def test_series_groupby_rolling_closed_param(rolling_series, closed): + bf_series, df_series = rolling_series + + actual_result = ( + bf_series.groupby(bf_series % 2) + .rolling(window=3, closed=closed) + .sum() + .to_pandas() + ) + + expected_result = ( + df_series.groupby(df_series % 2).rolling(window=3, closed=closed).sum() + ) + pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) + + +def test_series_rolling_default_closed_param(rolling_series): + bf_series, df_series = rolling_series + + actual_result = bf_series.rolling(window=3).sum().to_pandas() + + expected_result = df_series.rolling(window=3).sum() + pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) + + +def test_series_groupby_rolling_default_closed_param(rolling_series): + bf_series, df_series = rolling_series + + actual_result = bf_series.groupby(bf_series % 2).rolling(window=3).sum().to_pandas() + + expected_result = df_series.groupby(df_series % 2).rolling(window=3).sum() + pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) + + @pytest.mark.parametrize( ("windowing"), [ @@ -41,20 +146,13 @@ pytest.param(lambda x: x.var(), id="var"), ], ) -def test_series_window_agg_ops( - scalars_df_index, scalars_pandas_df_index, windowing, agg_op -): - col_name = "int64_too" - bf_series = agg_op(windowing(scalars_df_index[col_name])).to_pandas() - pd_series = agg_op(windowing(scalars_pandas_df_index[col_name])) - - # Pandas always converts to float64, even for min/max/count, which is not desired - pd_series = pd_series.astype(bf_series.dtype) - - pd.testing.assert_series_equal( - pd_series, - bf_series, - ) +def test_series_window_agg_ops(rolling_series, windowing, agg_op): + bf_series, pd_series = rolling_series + + actual_result = agg_op(windowing(bf_series)).to_pandas() + + expected_result = agg_op(windowing(pd_series)) + pd.testing.assert_series_equal(expected_result, actual_result, check_dtype=False) @pytest.mark.parametrize( @@ -83,13 +181,10 @@ def test_series_window_agg_ops( pytest.param(lambda x: x.var(), id="var"), ], ) -def test_dataframe_window_agg_ops( - scalars_df_index, scalars_pandas_df_index, windowing, agg_op -): - scalars_df_index = scalars_df_index.set_index("bool_col") - scalars_pandas_df_index = scalars_pandas_df_index.set_index("bool_col") - col_names = ["int64_too", "float64_col"] - bf_result = agg_op(windowing(scalars_df_index[col_names])).to_pandas() - pd_result = agg_op(windowing(scalars_pandas_df_index[col_names])) +def test_dataframe_window_agg_ops(rolling_dfs, windowing, agg_op): + bf_df, pd_df = rolling_dfs + + bf_result = agg_op(windowing(bf_df)).to_pandas() + pd_result = agg_op(windowing(pd_df)) pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False) diff --git a/tests/unit/core/test_windowspec.py b/tests/unit/core/test_windowspec.py index a630c87dda..b9de764136 100644 --- a/tests/unit/core/test_windowspec.py +++ b/tests/unit/core/test_windowspec.py @@ -27,3 +27,24 @@ def test_invalid_rows_window_boundary_raise_error(start, end): def test_invalid_range_window_boundary_raise_error(start, end): with pytest.raises(ValueError): window_spec.RangeWindowBounds(start, end) + + +@pytest.mark.parametrize( + ("window", "closed", "start", "end"), + [ + pytest.param(3, "left", -3, -1, id="left"), + pytest.param(3, "right", -2, 0, id="right"), + pytest.param(3, "neither", -2, -1, id="neither"), + pytest.param(3, "both", -3, 0, id="both"), + ], +) +def test_rows_window_bounds_from_window_size(window, closed, start, end): + actual_result = window_spec.RowsWindowBounds.from_window_size(window, closed) + + expected_result = window_spec.RowsWindowBounds(start, end) + assert actual_result == expected_result + + +def test_rows_window_bounds_from_window_size_invalid_closed_raise_error(): + with pytest.raises(ValueError): + window_spec.RowsWindowBounds.from_window_size(3, "whatever") # type:ignore