-
-
Notifications
You must be signed in to change notification settings - Fork 18.6k
CLN: Unify Window._apply_window and Rolling._apply functions #27403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
fb2ef95
646bb95
d387958
753f876
46bec7b
b4e6f69
ea76fc9
8823ee9
7b09f24
e2d2501
6494a1e
68d2b16
cc37244
96787e7
69994d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -182,7 +182,7 @@ def __getattr__(self, attr): | |
def _dir_additions(self): | ||
return self.obj._dir_additions() | ||
|
||
def _get_window(self, other=None): | ||
def _get_window(self, other=None, **kwargs): | ||
return self.window | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@property | ||
|
@@ -354,6 +354,112 @@ def _center_window(self, result, window): | |
result = np.copy(result[tuple(lead_indexer)]) | ||
return result | ||
|
||
def _get_roll(self, func, check_minp, index, **kwargs): | ||
|
||
cfunc = getattr(libwindow, func, None) | ||
if cfunc is None: | ||
raise ValueError( | ||
"we do not support this function " | ||
"in libwindow.{func}".format(func=func) | ||
) | ||
|
||
def func(arg, window, min_periods=None, closed=None): | ||
minp = check_minp(min_periods, window) | ||
# ensure we are only rolling on floats | ||
arg = ensure_float64(arg) | ||
|
||
return cfunc(arg, window, minp, index, closed, **kwargs) | ||
|
||
return func | ||
|
||
def _apply_roll(self, func, values): | ||
return np.apply_along_axis(func, self.axis, values) | ||
|
||
def _apply( | ||
self, func, name=None, window=None, center=None, check_minp=None, **kwargs | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
""" | ||
Rolling statistical measure using supplied function. | ||
|
||
Designed to be used with passed-in Cython array-based functions. | ||
|
||
Parameters | ||
---------- | ||
func : str/callable to apply | ||
name : str, optional | ||
name of this function | ||
window : int/array, default to _get_window() | ||
center : bool, default to self.center | ||
check_minp : function, default to _use_window | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add what kwargs is here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we modify kwargs (or copy of it) to remove window function (passed to _get_window) kwargs first. window rolling functions (passed to _get_roll_func) don't take kwargs but when they do it will be a problem |
||
Returns | ||
------- | ||
y : type of input | ||
""" | ||
if center is None: | ||
center = self.center | ||
|
||
if check_minp is None: | ||
check_minp = _use_window | ||
|
||
if window is None: | ||
window = self._get_window(**kwargs) | ||
|
||
blocks, obj, index = self._create_blocks() | ||
block_list = list(blocks) | ||
index, indexi = self._get_index(index=index) | ||
|
||
results = [] | ||
exclude = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
not sure what confuses mypy to require type for exclude (this is the reason of ci fail) |
||
for i, b in enumerate(blocks): | ||
try: | ||
values = self._prep_values(b.values) | ||
|
||
except (TypeError, NotImplementedError): | ||
if isinstance(obj, ABCDataFrame): | ||
exclude.extend(b.columns) | ||
del block_list[i] | ||
continue | ||
else: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
if values.size == 0: | ||
results.append(values.copy()) | ||
continue | ||
|
||
# if we have a string function name, wrap it | ||
if isinstance(func, str): | ||
func = self._get_roll(func, check_minp, indexi, **kwargs) | ||
|
||
# calculation function | ||
if center: | ||
offset = _offset(window, center) | ||
additional_nans = np.array([np.NaN] * offset) | ||
|
||
def calc(x): | ||
return func( | ||
np.concatenate((x, additional_nans)), | ||
window, | ||
min_periods=self.min_periods, | ||
closed=self.closed, | ||
) | ||
|
||
else: | ||
|
||
def calc(x): | ||
return func( | ||
x, window, min_periods=self.min_periods, closed=self.closed | ||
) | ||
|
||
result = self._apply_roll(calc, values) | ||
|
||
if center: | ||
result = self._center_window(result, window) | ||
|
||
results.append(result) | ||
|
||
return self._wrap_results(results, block_list, obj, exclude) | ||
|
||
def aggregate(self, func, *args, **kwargs): | ||
result, how = self._aggregate(func, *args, **kwargs) | ||
if result is None: | ||
|
@@ -661,13 +767,13 @@ def validate(self): | |
else: | ||
raise ValueError("Invalid window {0}".format(window)) | ||
|
||
def _prep_window(self, **kwargs): | ||
def _get_window(self, **kwargs): | ||
""" | ||
Provide validation for our window type, return the window | ||
we have already been validated. | ||
""" | ||
|
||
window = self._get_window() | ||
window = self.window | ||
if isinstance(window, (list, tuple, np.ndarray)): | ||
return com.asarray_tuplesafe(window).astype(float) | ||
elif is_integer(window): | ||
|
@@ -707,63 +813,20 @@ def _pop_args(win_type, arg_names, kwargs): | |
# GH #15662. `False` makes symmetric window, rather than periodic. | ||
return sig.get_window(win_type, window, False).astype(float) | ||
|
||
def _apply_window(self, mean=True, **kwargs): | ||
""" | ||
Applies a moving window of type ``window_type`` on the data. | ||
def _get_roll(self, func, check_minp, *args, **kwargs): | ||
|
||
Parameters | ||
---------- | ||
mean : bool, default True | ||
If True computes weighted mean, else weighted sum | ||
|
||
Returns | ||
------- | ||
y : same type as input argument | ||
|
||
""" | ||
window = self._prep_window(**kwargs) | ||
center = self.center | ||
|
||
blocks, obj, index = self._create_blocks() | ||
block_list = list(blocks) | ||
|
||
results = [] | ||
exclude = [] | ||
for i, b in enumerate(blocks): | ||
try: | ||
values = self._prep_values(b.values) | ||
|
||
except (TypeError, NotImplementedError): | ||
if isinstance(obj, ABCDataFrame): | ||
exclude.extend(b.columns) | ||
del block_list[i] | ||
continue | ||
else: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
if values.size == 0: | ||
results.append(values.copy()) | ||
continue | ||
|
||
offset = _offset(window, center) | ||
additional_nans = np.array([np.NaN] * offset) | ||
|
||
def f(arg, *args, **kwargs): | ||
minp = _use_window(self.min_periods, len(window)) | ||
return libwindow.roll_window( | ||
np.concatenate((arg, additional_nans)) if center else arg, | ||
window, | ||
minp, | ||
avg=mean, | ||
) | ||
|
||
result = np.apply_along_axis(f, self.axis, values) | ||
cfunc = getattr(libwindow, func, None) | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if cfunc is None: | ||
raise ValueError( | ||
"we do not support this function " | ||
"in libwindow.{func}".format(func=func) | ||
) | ||
|
||
if center: | ||
result = self._center_window(result, window) | ||
results.append(result) | ||
def func(arg, window, min_periods=None, closed=None): | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
minp = check_minp(min_periods, len(window)) | ||
return cfunc(arg, window, minp) | ||
|
||
return self._wrap_results(results, block_list, obj, exclude) | ||
return func | ||
|
||
_agg_see_also_doc = dedent( | ||
""" | ||
|
@@ -831,13 +894,13 @@ def aggregate(self, arg, *args, **kwargs): | |
@Appender(_shared_docs["sum"]) | ||
def sum(self, *args, **kwargs): | ||
nv.validate_window_func("sum", args, kwargs) | ||
return self._apply_window(mean=False, **kwargs) | ||
return self._apply("roll_weighted_sum", **kwargs) | ||
|
||
@Substitution(name="window") | ||
@Appender(_shared_docs["mean"]) | ||
def mean(self, *args, **kwargs): | ||
nv.validate_window_func("mean", args, kwargs) | ||
return self._apply_window(mean=True, **kwargs) | ||
return self._apply("roll_weighted_mean", **kwargs) | ||
|
||
|
||
class _GroupByMixin(GroupByMixin): | ||
|
@@ -883,104 +946,15 @@ class _Rolling(_Window): | |
def _constructor(self): | ||
return Rolling | ||
|
||
def _apply( | ||
self, func, name=None, window=None, center=None, check_minp=None, **kwargs | ||
): | ||
""" | ||
Rolling statistical measure using supplied function. | ||
|
||
Designed to be used with passed-in Cython array-based functions. | ||
|
||
Parameters | ||
---------- | ||
func : str/callable to apply | ||
name : str, optional | ||
name of this function | ||
window : int/array, default to _get_window() | ||
center : bool, default to self.center | ||
check_minp : function, default to _use_window | ||
|
||
Returns | ||
------- | ||
y : type of input | ||
""" | ||
if center is None: | ||
center = self.center | ||
if window is None: | ||
window = self._get_window() | ||
|
||
if check_minp is None: | ||
check_minp = _use_window | ||
|
||
blocks, obj, index = self._create_blocks() | ||
block_list = list(blocks) | ||
index, indexi = self._get_index(index=index) | ||
|
||
results = [] | ||
exclude = [] | ||
for i, b in enumerate(blocks): | ||
try: | ||
values = self._prep_values(b.values) | ||
|
||
except (TypeError, NotImplementedError): | ||
if isinstance(obj, ABCDataFrame): | ||
exclude.extend(b.columns) | ||
del block_list[i] | ||
continue | ||
else: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
if values.size == 0: | ||
results.append(values.copy()) | ||
continue | ||
|
||
# if we have a string function name, wrap it | ||
if isinstance(func, str): | ||
cfunc = getattr(libwindow, func, None) | ||
if cfunc is None: | ||
raise ValueError( | ||
"we do not support this function " | ||
"in libwindow.{func}".format(func=func) | ||
) | ||
|
||
def func(arg, window, min_periods=None, closed=None): | ||
minp = check_minp(min_periods, window) | ||
# ensure we are only rolling on floats | ||
arg = ensure_float64(arg) | ||
return cfunc(arg, window, minp, indexi, closed, **kwargs) | ||
|
||
# calculation function | ||
if center: | ||
offset = _offset(window, center) | ||
additional_nans = np.array([np.NaN] * offset) | ||
|
||
def calc(x): | ||
return func( | ||
np.concatenate((x, additional_nans)), | ||
window, | ||
min_periods=self.min_periods, | ||
closed=self.closed, | ||
) | ||
def _apply_roll(self, func, values): | ||
|
||
with np.errstate(all="ignore"): | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if values.ndim > 1: | ||
result = np.apply_along_axis(func, self.axis, values) | ||
else: | ||
result = func(values) | ||
|
||
def calc(x): | ||
return func( | ||
x, window, min_periods=self.min_periods, closed=self.closed | ||
) | ||
|
||
with np.errstate(all="ignore"): | ||
if values.ndim > 1: | ||
result = np.apply_along_axis(calc, self.axis, values) | ||
else: | ||
result = calc(values) | ||
|
||
if center: | ||
result = self._center_window(result, window) | ||
|
||
results.append(result) | ||
|
||
return self._wrap_results(results, block_list, obj, exclude) | ||
return result | ||
|
||
|
||
class _Rolling_and_Expanding(_Rolling): | ||
|
@@ -2033,7 +2007,7 @@ def __init__(self, obj, min_periods=1, center=False, axis=0, **kwargs): | |
def _constructor(self): | ||
return Expanding | ||
|
||
def _get_window(self, other=None): | ||
def _get_window(self, other=None, **kwargs): | ||
""" | ||
Get the window length over which to perform some operation. | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.