Skip to content

CLN: Unify Window._apply_window and Rolling._apply functions #27403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pandas/_libs/window.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1675,9 +1675,25 @@ def roll_generic(object obj,
return output


def roll_window(ndarray[float64_t, ndim=1, cast=True] values,
ndarray[float64_t, ndim=1, cast=True] weights,
int minp, bint avg=True):
# ----------------------------------------------------------------------
# Rolling sum and mean for weighted window


def roll_weighted_sum(ndarray[float64_t, ndim=1, cast=True] values,
ndarray[float64_t, ndim=1, cast=True] weights,
int minp):
return _roll_weighted_sum_mean(values, weights, minp, avg=0)


def roll_weighted_mean(ndarray[float64_t, ndim=1, cast=True] values,
ndarray[float64_t, ndim=1, cast=True] weights,
int minp):
return _roll_weighted_sum_mean(values, weights, minp, avg=1)


def _roll_weighted_sum_mean(ndarray[float64_t, ndim=1, cast=True] values,
ndarray[float64_t, ndim=1, cast=True] weights,
int minp, bint avg=True):
"""
Assume len(weights) << len(values)
"""
Expand All @@ -1688,6 +1704,7 @@ def roll_window(ndarray[float64_t, ndim=1, cast=True] values,

in_n = len(values)
win_n = len(weights)

output = np.zeros(in_n, dtype=float)
counts = np.zeros(in_n, dtype=float)
if avg:
Expand Down Expand Up @@ -1739,6 +1756,7 @@ def roll_window(ndarray[float64_t, ndim=1, cast=True] values,

return output


# ----------------------------------------------------------------------
# Exponentially weighted moving average

Expand Down
271 changes: 112 additions & 159 deletions pandas/core/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,116 @@ def _center_window(self, result, window):
result = np.copy(result[tuple(lead_indexer)])
return result

def _apply(
self, func, name=None, window=None, center=None, check_minp=None, **kwargs
):
"""
Rolling statistical measure using supplied function.

Designed to be used with passed-in Cython array-based functions.

Parameters
----------
func : str/callable to apply
name : str, optional
name of this function
window : int/array, default to _get_window()
center : bool, default to self.center
check_minp : function, default to _use_window

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add what kwargs is here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we modify kwargs (or copy of it) to remove window function (passed to _get_window) kwargs first. window rolling functions (passed to _get_roll_func) don't take kwargs but when they do it will be a problem

Returns
-------
y : type of input
"""
if center is None:
center = self.center

if check_minp is None:
check_minp = _use_window

blocks, obj, index = self._create_blocks()
block_list = list(blocks)

is_window = isinstance(self, Window)

if is_window:
window = self._prep_window(**kwargs)
else:
window = self._get_window()
index, indexi = self._get_index(index=index)

results = []
exclude = []
Copy link
Contributor Author

@ihsansecer ihsansecer Jul 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pandas/core/window.py:426: error: Need type annotation for 'exclude'

not sure what confuses mypy to require type for exclude (this is the reason of ci fail)

for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError):
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate")

if values.size == 0:
results.append(values.copy())
continue

# if we have a string function name, wrap it
if isinstance(func, str):
cfunc = getattr(libwindow, func, None)
if cfunc is None:
raise ValueError(
"we do not support this function "
"in libwindow.{func}".format(func=func)
)

def func(arg, window, min_periods=None, closed=None):
# ensure we are only rolling on floats
arg = ensure_float64(arg)

if is_window:
minp = check_minp(min_periods, len(window))
return cfunc(arg, window, minp)

else:
minp = check_minp(min_periods, window)
return cfunc(arg, window, minp, indexi, closed, **kwargs)

# calculation function
if center:
offset = _offset(window, center)
additional_nans = np.array([np.NaN] * offset)

def calc(x):
return func(
np.concatenate((x, additional_nans)),
window,
min_periods=self.min_periods,
closed=self.closed,
)

else:

def calc(x):
return func(
x, window, min_periods=self.min_periods, closed=self.closed
)

with np.errstate(all="ignore"):
if values.ndim > 1 or is_window:
result = np.apply_along_axis(calc, self.axis, values)
else:
result = calc(values)

if center:
result = self._center_window(result, window)

results.append(result)

return self._wrap_results(results, block_list, obj, exclude)

def aggregate(self, func, *args, **kwargs):
result, how = self._aggregate(func, *args, **kwargs)
if result is None:
Expand Down Expand Up @@ -707,64 +817,6 @@ def _pop_args(win_type, arg_names, kwargs):
# GH #15662. `False` makes symmetric window, rather than periodic.
return sig.get_window(win_type, window, False).astype(float)

def _apply_window(self, mean=True, **kwargs):
"""
Applies a moving window of type ``window_type`` on the data.

Parameters
----------
mean : bool, default True
If True computes weighted mean, else weighted sum

Returns
-------
y : same type as input argument

"""
window = self._prep_window(**kwargs)
center = self.center

blocks, obj, index = self._create_blocks()
block_list = list(blocks)

results = []
exclude = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError):
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate")

if values.size == 0:
results.append(values.copy())
continue

offset = _offset(window, center)
additional_nans = np.array([np.NaN] * offset)

def f(arg, *args, **kwargs):
minp = _use_window(self.min_periods, len(window))
return libwindow.roll_window(
np.concatenate((arg, additional_nans)) if center else arg,
window,
minp,
avg=mean,
)

result = np.apply_along_axis(f, self.axis, values)

if center:
result = self._center_window(result, window)
results.append(result)

return self._wrap_results(results, block_list, obj, exclude)

_agg_see_also_doc = dedent(
"""
See Also
Expand Down Expand Up @@ -831,13 +883,13 @@ def aggregate(self, arg, *args, **kwargs):
@Appender(_shared_docs["sum"])
def sum(self, *args, **kwargs):
nv.validate_window_func("sum", args, kwargs)
return self._apply_window(mean=False, **kwargs)
return self._apply("roll_weighted_sum", **kwargs)

@Substitution(name="window")
@Appender(_shared_docs["mean"])
def mean(self, *args, **kwargs):
nv.validate_window_func("mean", args, kwargs)
return self._apply_window(mean=True, **kwargs)
return self._apply("roll_weighted_mean", **kwargs)


class _GroupByMixin(GroupByMixin):
Expand Down Expand Up @@ -883,105 +935,6 @@ class _Rolling(_Window):
def _constructor(self):
return Rolling

def _apply(
self, func, name=None, window=None, center=None, check_minp=None, **kwargs
):
"""
Rolling statistical measure using supplied function.

Designed to be used with passed-in Cython array-based functions.

Parameters
----------
func : str/callable to apply
name : str, optional
name of this function
window : int/array, default to _get_window()
center : bool, default to self.center
check_minp : function, default to _use_window

Returns
-------
y : type of input
"""
if center is None:
center = self.center
if window is None:
window = self._get_window()

if check_minp is None:
check_minp = _use_window

blocks, obj, index = self._create_blocks()
block_list = list(blocks)
index, indexi = self._get_index(index=index)

results = []
exclude = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError):
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate")

if values.size == 0:
results.append(values.copy())
continue

# if we have a string function name, wrap it
if isinstance(func, str):
cfunc = getattr(libwindow, func, None)
if cfunc is None:
raise ValueError(
"we do not support this function "
"in libwindow.{func}".format(func=func)
)

def func(arg, window, min_periods=None, closed=None):
minp = check_minp(min_periods, window)
# ensure we are only rolling on floats
arg = ensure_float64(arg)
return cfunc(arg, window, minp, indexi, closed, **kwargs)

# calculation function
if center:
offset = _offset(window, center)
additional_nans = np.array([np.NaN] * offset)

def calc(x):
return func(
np.concatenate((x, additional_nans)),
window,
min_periods=self.min_periods,
closed=self.closed,
)

else:

def calc(x):
return func(
x, window, min_periods=self.min_periods, closed=self.closed
)

with np.errstate(all="ignore"):
if values.ndim > 1:
result = np.apply_along_axis(calc, self.axis, values)
else:
result = calc(values)

if center:
result = self._center_window(result, window)

results.append(result)

return self._wrap_results(results, block_list, obj, exclude)


class _Rolling_and_Expanding(_Rolling):

Expand Down