-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
CLN: Unify Window._apply_window and Rolling._apply functions #27403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
fb2ef95
646bb95
d387958
753f876
46bec7b
b4e6f69
ea76fc9
8823ee9
7b09f24
e2d2501
6494a1e
68d2b16
cc37244
96787e7
69994d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -354,6 +354,116 @@ def _center_window(self, result, window): | |
result = np.copy(result[tuple(lead_indexer)]) | ||
return result | ||
|
||
def _apply( | ||
self, func, name=None, window=None, center=None, check_minp=None, **kwargs | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
""" | ||
Rolling statistical measure using supplied function. | ||
|
||
Designed to be used with passed-in Cython array-based functions. | ||
|
||
Parameters | ||
---------- | ||
func : str/callable to apply | ||
name : str, optional | ||
name of this function | ||
window : int/array, default to _get_window() | ||
center : bool, default to self.center | ||
check_minp : function, default to _use_window | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add what kwargs is here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we modify kwargs (or copy of it) to remove window function (passed to _get_window) kwargs first. window rolling functions (passed to _get_roll_func) don't take kwargs but when they do it will be a problem |
||
Returns | ||
------- | ||
y : type of input | ||
""" | ||
if center is None: | ||
center = self.center | ||
|
||
if check_minp is None: | ||
check_minp = _use_window | ||
|
||
blocks, obj, index = self._create_blocks() | ||
block_list = list(blocks) | ||
|
||
is_window = isinstance(self, Window) | ||
ihsansecer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if is_window: | ||
window = self._prep_window(**kwargs) | ||
else: | ||
window = self._get_window() | ||
index, indexi = self._get_index(index=index) | ||
|
||
results = [] | ||
exclude = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
not sure what confuses mypy to require type for exclude (this is the reason of ci fail) |
||
for i, b in enumerate(blocks): | ||
try: | ||
values = self._prep_values(b.values) | ||
|
||
except (TypeError, NotImplementedError): | ||
if isinstance(obj, ABCDataFrame): | ||
exclude.extend(b.columns) | ||
del block_list[i] | ||
continue | ||
else: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
if values.size == 0: | ||
results.append(values.copy()) | ||
continue | ||
|
||
# if we have a string function name, wrap it | ||
if isinstance(func, str): | ||
cfunc = getattr(libwindow, func, None) | ||
if cfunc is None: | ||
raise ValueError( | ||
"we do not support this function " | ||
"in libwindow.{func}".format(func=func) | ||
) | ||
|
||
def func(arg, window, min_periods=None, closed=None): | ||
# ensure we are only rolling on floats | ||
arg = ensure_float64(arg) | ||
|
||
if is_window: | ||
minp = check_minp(min_periods, len(window)) | ||
return cfunc(arg, window, minp) | ||
|
||
else: | ||
minp = check_minp(min_periods, window) | ||
return cfunc(arg, window, minp, indexi, closed, **kwargs) | ||
|
||
# calculation function | ||
if center: | ||
offset = _offset(window, center) | ||
additional_nans = np.array([np.NaN] * offset) | ||
|
||
def calc(x): | ||
return func( | ||
np.concatenate((x, additional_nans)), | ||
window, | ||
min_periods=self.min_periods, | ||
closed=self.closed, | ||
) | ||
|
||
else: | ||
|
||
def calc(x): | ||
return func( | ||
x, window, min_periods=self.min_periods, closed=self.closed | ||
) | ||
|
||
with np.errstate(all="ignore"): | ||
if values.ndim > 1 or is_window: | ||
result = np.apply_along_axis(calc, self.axis, values) | ||
else: | ||
result = calc(values) | ||
|
||
if center: | ||
result = self._center_window(result, window) | ||
|
||
results.append(result) | ||
|
||
return self._wrap_results(results, block_list, obj, exclude) | ||
|
||
def aggregate(self, func, *args, **kwargs): | ||
result, how = self._aggregate(func, *args, **kwargs) | ||
if result is None: | ||
|
@@ -707,64 +817,6 @@ def _pop_args(win_type, arg_names, kwargs): | |
# GH #15662. `False` makes symmetric window, rather than periodic. | ||
return sig.get_window(win_type, window, False).astype(float) | ||
|
||
def _apply_window(self, mean=True, **kwargs): | ||
""" | ||
Applies a moving window of type ``window_type`` on the data. | ||
|
||
Parameters | ||
---------- | ||
mean : bool, default True | ||
If True computes weighted mean, else weighted sum | ||
|
||
Returns | ||
------- | ||
y : same type as input argument | ||
|
||
""" | ||
window = self._prep_window(**kwargs) | ||
center = self.center | ||
|
||
blocks, obj, index = self._create_blocks() | ||
block_list = list(blocks) | ||
|
||
results = [] | ||
exclude = [] | ||
for i, b in enumerate(blocks): | ||
try: | ||
values = self._prep_values(b.values) | ||
|
||
except (TypeError, NotImplementedError): | ||
if isinstance(obj, ABCDataFrame): | ||
exclude.extend(b.columns) | ||
del block_list[i] | ||
continue | ||
else: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
if values.size == 0: | ||
results.append(values.copy()) | ||
continue | ||
|
||
offset = _offset(window, center) | ||
additional_nans = np.array([np.NaN] * offset) | ||
|
||
def f(arg, *args, **kwargs): | ||
minp = _use_window(self.min_periods, len(window)) | ||
return libwindow.roll_window( | ||
np.concatenate((arg, additional_nans)) if center else arg, | ||
window, | ||
minp, | ||
avg=mean, | ||
) | ||
|
||
result = np.apply_along_axis(f, self.axis, values) | ||
|
||
if center: | ||
result = self._center_window(result, window) | ||
results.append(result) | ||
|
||
return self._wrap_results(results, block_list, obj, exclude) | ||
|
||
_agg_see_also_doc = dedent( | ||
""" | ||
See Also | ||
|
@@ -831,13 +883,13 @@ def aggregate(self, arg, *args, **kwargs): | |
@Appender(_shared_docs["sum"]) | ||
def sum(self, *args, **kwargs): | ||
nv.validate_window_func("sum", args, kwargs) | ||
return self._apply_window(mean=False, **kwargs) | ||
return self._apply("roll_weighted_sum", **kwargs) | ||
|
||
@Substitution(name="window") | ||
@Appender(_shared_docs["mean"]) | ||
def mean(self, *args, **kwargs): | ||
nv.validate_window_func("mean", args, kwargs) | ||
return self._apply_window(mean=True, **kwargs) | ||
return self._apply("roll_weighted_mean", **kwargs) | ||
|
||
|
||
class _GroupByMixin(GroupByMixin): | ||
|
@@ -883,105 +935,6 @@ class _Rolling(_Window): | |
def _constructor(self): | ||
return Rolling | ||
|
||
def _apply( | ||
self, func, name=None, window=None, center=None, check_minp=None, **kwargs | ||
): | ||
""" | ||
Rolling statistical measure using supplied function. | ||
|
||
Designed to be used with passed-in Cython array-based functions. | ||
|
||
Parameters | ||
---------- | ||
func : str/callable to apply | ||
name : str, optional | ||
name of this function | ||
window : int/array, default to _get_window() | ||
center : bool, default to self.center | ||
check_minp : function, default to _use_window | ||
|
||
Returns | ||
------- | ||
y : type of input | ||
""" | ||
if center is None: | ||
center = self.center | ||
if window is None: | ||
window = self._get_window() | ||
|
||
if check_minp is None: | ||
check_minp = _use_window | ||
|
||
blocks, obj, index = self._create_blocks() | ||
block_list = list(blocks) | ||
index, indexi = self._get_index(index=index) | ||
|
||
results = [] | ||
exclude = [] | ||
for i, b in enumerate(blocks): | ||
try: | ||
values = self._prep_values(b.values) | ||
|
||
except (TypeError, NotImplementedError): | ||
if isinstance(obj, ABCDataFrame): | ||
exclude.extend(b.columns) | ||
del block_list[i] | ||
continue | ||
else: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
if values.size == 0: | ||
results.append(values.copy()) | ||
continue | ||
|
||
# if we have a string function name, wrap it | ||
if isinstance(func, str): | ||
cfunc = getattr(libwindow, func, None) | ||
if cfunc is None: | ||
raise ValueError( | ||
"we do not support this function " | ||
"in libwindow.{func}".format(func=func) | ||
) | ||
|
||
def func(arg, window, min_periods=None, closed=None): | ||
minp = check_minp(min_periods, window) | ||
# ensure we are only rolling on floats | ||
arg = ensure_float64(arg) | ||
return cfunc(arg, window, minp, indexi, closed, **kwargs) | ||
|
||
# calculation function | ||
if center: | ||
offset = _offset(window, center) | ||
additional_nans = np.array([np.NaN] * offset) | ||
|
||
def calc(x): | ||
return func( | ||
np.concatenate((x, additional_nans)), | ||
window, | ||
min_periods=self.min_periods, | ||
closed=self.closed, | ||
) | ||
|
||
else: | ||
|
||
def calc(x): | ||
return func( | ||
x, window, min_periods=self.min_periods, closed=self.closed | ||
) | ||
|
||
with np.errstate(all="ignore"): | ||
if values.ndim > 1: | ||
result = np.apply_along_axis(calc, self.axis, values) | ||
else: | ||
result = calc(values) | ||
|
||
if center: | ||
result = self._center_window(result, window) | ||
|
||
results.append(result) | ||
|
||
return self._wrap_results(results, block_list, obj, exclude) | ||
|
||
|
||
class _Rolling_and_Expanding(_Rolling): | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.