Skip to content

feat: add closed parameter in rolling() #1539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
from __future__ import annotations

import typing
from typing import Sequence, Tuple, Union
from typing import Literal, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
import jellyfish
import pandas as pd

from bigframes import session
from bigframes.core import expression as ex
from bigframes.core import log_adapter
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.expression
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
Expand Down Expand Up @@ -305,13 +305,16 @@ def diff(self, periods=1) -> series.Series:
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
start=-(window - 1),
end=0,
min_periods=min_periods or window,
def rolling(
self,
window: int,
min_periods=None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> windows.Window:
window_spec = window_specs.WindowSpec(
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
)
block = self._block.order_by(
[order.ascending_over(col) for col in self._by_col_ids],
Expand Down Expand Up @@ -361,7 +364,7 @@ def _agg_string(self, func: str) -> df.DataFrame:
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
aggregations: typing.List[bigframes.core.expression.Aggregation] = []
aggregations: typing.List[ex.Aggregation] = []
column_labels = []

want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values())
Expand Down Expand Up @@ -738,13 +741,16 @@ def diff(self, periods=1) -> series.Series:
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
start=-(window - 1),
end=0,
min_periods=min_periods or window,
def rolling(
self,
window: int,
min_periods=None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> windows.Window:
window_spec = window_specs.WindowSpec(
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
)
block = self._block.order_by(
[order.ascending_over(col) for col in self._by_col_ids],
Expand Down Expand Up @@ -806,11 +812,9 @@ def _apply_window_op(
return series.Series(block.select_column(result_id))


def agg(input: str, op: agg_ops.AggregateOp) -> bigframes.core.expression.Aggregation:
def agg(input: str, op: agg_ops.AggregateOp) -> ex.Aggregation:
if isinstance(op, agg_ops.UnaryAggregateOp):
return bigframes.core.expression.UnaryAggregation(
op, bigframes.core.expression.deref(input)
)
return ex.UnaryAggregation(op, ex.deref(input))
else:
assert isinstance(op, agg_ops.NullaryAggregateOp)
return bigframes.core.expression.NullaryAggregation(op)
return ex.NullaryAggregation(op)
17 changes: 16 additions & 1 deletion bigframes/core/window_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from dataclasses import dataclass, replace
import itertools
from typing import Mapping, Optional, Set, Tuple, Union
from typing import Literal, Mapping, Optional, Set, Tuple, Union

import bigframes.core.expression as ex
import bigframes.core.identifiers as ids
Expand Down Expand Up @@ -140,6 +140,21 @@ class RowsWindowBounds:
start: Optional[int] = None
end: Optional[int] = None

@classmethod
def from_window_size(
cls, window: int, closed: Literal["right", "left", "both", "neither"]
) -> RowsWindowBounds:
if closed == "right":
return cls(-(window - 1), 0)
elif closed == "left":
return cls(-window, -1)
elif closed == "both":
return cls(-window, 0)
elif closed == "neither":
return cls(-(window - 1), -1)
else:
raise ValueError(f"Unsupported value for 'closed' parameter: {closed}")

def __post_init__(self):
if self.start is None:
return
Expand Down
13 changes: 9 additions & 4 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3308,10 +3308,15 @@ def _perform_join_by_index(
return DataFrame(block)

@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window:
# To get n size window, need current row and n-1 preceding rows.
window_def = windows.rows(
start=-(window - 1), end=0, min_periods=min_periods or window
def rolling(
self,
window: int,
min_periods=None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> bigframes.core.window.Window:
window_def = windows.WindowSpec(
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
)
return bigframes.core.window.Window(
self._block, window_def, self._block.value_columns
Expand Down
16 changes: 10 additions & 6 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@
import typing_extensions

import bigframes.core
from bigframes.core import log_adapter
from bigframes.core import groupby, log_adapter
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.expression as ex
import bigframes.core.groupby as groupby
import bigframes.core.indexers
import bigframes.core.indexes as indexes
import bigframes.core.ordering as order
Expand Down Expand Up @@ -1438,10 +1437,15 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series:
return Series(block)

@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = windows.rows(
start=-(window - 1), end=0, min_periods=min_periods or window
def rolling(
self,
window: int,
min_periods=None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> bigframes.core.window.Window:
window_spec = windows.WindowSpec(
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
)
return bigframes.core.window.Window(
self._block, window_spec, self._block.value_columns, is_series=True
Expand Down
139 changes: 117 additions & 22 deletions tests/system/small/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,111 @@
import pytest


@pytest.fixture(scope="module")
def rolling_dfs(scalars_dfs):
bf_df, pd_df = scalars_dfs

target_cols = ["int64_too", "float64_col", "bool_col"]

bf_df = bf_df[target_cols].set_index("bool_col")
pd_df = pd_df[target_cols].set_index("bool_col")

return bf_df, pd_df


@pytest.fixture(scope="module")
def rolling_series(scalars_dfs):
bf_df, pd_df = scalars_dfs
target_col = "int64_too"

return bf_df[target_col], pd_df[target_col]


@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"])
def test_dataframe_rolling_closed_param(rolling_dfs, closed):
bf_df, pd_df = rolling_dfs

actual_result = bf_df.rolling(window=3, closed=closed).sum().to_pandas()

expected_result = pd_df.rolling(window=3, closed=closed).sum()
pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False)


@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"])
def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed):
bf_df, pd_df = rolling_dfs

actual_result = (
bf_df.groupby(level=0).rolling(window=3, closed=closed).sum().to_pandas()
)

expected_result = pd_df.groupby(level=0).rolling(window=3, closed=closed).sum()
pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False)


def test_dataframe_rolling_default_closed_param(rolling_dfs):
bf_df, pd_df = rolling_dfs

actual_result = bf_df.rolling(window=3).sum().to_pandas()

expected_result = pd_df.rolling(window=3).sum()
pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False)


def test_dataframe_groupby_rolling_default_closed_param(rolling_dfs):
bf_df, pd_df = rolling_dfs

actual_result = bf_df.groupby(level=0).rolling(window=3).sum().to_pandas()

expected_result = pd_df.groupby(level=0).rolling(window=3).sum()
pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False)


@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"])
def test_series_rolling_closed_param(rolling_series, closed):
bf_series, df_series = rolling_series

actual_result = bf_series.rolling(window=3, closed=closed).sum().to_pandas()

expected_result = df_series.rolling(window=3, closed=closed).sum()
pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False)


@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"])
def test_series_groupby_rolling_closed_param(rolling_series, closed):
bf_series, df_series = rolling_series

actual_result = (
bf_series.groupby(bf_series % 2)
.rolling(window=3, closed=closed)
.sum()
.to_pandas()
)

expected_result = (
df_series.groupby(df_series % 2).rolling(window=3, closed=closed).sum()
)
pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False)


def test_series_rolling_default_closed_param(rolling_series):
bf_series, df_series = rolling_series

actual_result = bf_series.rolling(window=3).sum().to_pandas()

expected_result = df_series.rolling(window=3).sum()
pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False)


def test_series_groupby_rolling_default_closed_param(rolling_series):
bf_series, df_series = rolling_series

actual_result = bf_series.groupby(bf_series % 2).rolling(window=3).sum().to_pandas()

expected_result = df_series.groupby(df_series % 2).rolling(window=3).sum()
pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False)


@pytest.mark.parametrize(
("windowing"),
[
Expand All @@ -41,20 +146,13 @@
pytest.param(lambda x: x.var(), id="var"),
],
)
def test_series_window_agg_ops(
scalars_df_index, scalars_pandas_df_index, windowing, agg_op
):
col_name = "int64_too"
bf_series = agg_op(windowing(scalars_df_index[col_name])).to_pandas()
pd_series = agg_op(windowing(scalars_pandas_df_index[col_name]))

# Pandas always converts to float64, even for min/max/count, which is not desired
pd_series = pd_series.astype(bf_series.dtype)

pd.testing.assert_series_equal(
pd_series,
bf_series,
)
def test_series_window_agg_ops(rolling_series, windowing, agg_op):
bf_series, pd_series = rolling_series

actual_result = agg_op(windowing(bf_series)).to_pandas()

expected_result = agg_op(windowing(pd_series))
pd.testing.assert_series_equal(expected_result, actual_result, check_dtype=False)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -83,13 +181,10 @@ def test_series_window_agg_ops(
pytest.param(lambda x: x.var(), id="var"),
],
)
def test_dataframe_window_agg_ops(
scalars_df_index, scalars_pandas_df_index, windowing, agg_op
):
scalars_df_index = scalars_df_index.set_index("bool_col")
scalars_pandas_df_index = scalars_pandas_df_index.set_index("bool_col")
col_names = ["int64_too", "float64_col"]
bf_result = agg_op(windowing(scalars_df_index[col_names])).to_pandas()
pd_result = agg_op(windowing(scalars_pandas_df_index[col_names]))
def test_dataframe_window_agg_ops(rolling_dfs, windowing, agg_op):
bf_df, pd_df = rolling_dfs

bf_result = agg_op(windowing(bf_df)).to_pandas()

pd_result = agg_op(windowing(pd_df))
pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False)
21 changes: 21 additions & 0 deletions tests/unit/core/test_windowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,24 @@ def test_invalid_rows_window_boundary_raise_error(start, end):
def test_invalid_range_window_boundary_raise_error(start, end):
with pytest.raises(ValueError):
window_spec.RangeWindowBounds(start, end)


@pytest.mark.parametrize(
("window", "closed", "start", "end"),
[
pytest.param(3, "left", -3, -1, id="left"),
pytest.param(3, "right", -2, 0, id="right"),
pytest.param(3, "neither", -2, -1, id="neither"),
pytest.param(3, "both", -3, 0, id="both"),
],
)
def test_rows_window_bounds_from_window_size(window, closed, start, end):
actual_result = window_spec.RowsWindowBounds.from_window_size(window, closed)

expected_result = window_spec.RowsWindowBounds(start, end)
assert actual_result == expected_result


def test_rows_window_bounds_from_window_size_invalid_closed_raise_error():
with pytest.raises(ValueError):
window_spec.RowsWindowBounds.from_window_size(3, "whatever") # type:ignore