-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
[ArrayManager] GroupBy cython aggregations (no fallback) #39885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
df70d2d
9cbbf97
692175e
e8e108b
a5fb361
a7bf71e
8c1b8a2
06b6f3f
244152b
32bf7d1
b44804e
50fb97f
1d63f72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
Mapping, | ||
Optional, | ||
Sequence, | ||
Tuple, | ||
Type, | ||
TypeVar, | ||
Union, | ||
|
@@ -43,6 +44,7 @@ | |
ArrayLike, | ||
FrameOrSeries, | ||
FrameOrSeriesUnion, | ||
Manager, | ||
) | ||
from pandas.util._decorators import ( | ||
Appender, | ||
|
@@ -109,7 +111,10 @@ | |
all_indexes_same, | ||
) | ||
import pandas.core.indexes.base as ibase | ||
from pandas.core.internals import BlockManager | ||
from pandas.core.internals import ( | ||
ArrayManager, | ||
BlockManager, | ||
) | ||
from pandas.core.series import Series | ||
from pandas.core.util.numba_ import maybe_use_numba | ||
|
||
|
@@ -187,6 +192,31 @@ def pinner(cls): | |
return pinner | ||
|
||
|
||
def _cast_agg_result( | ||
result, values: ArrayLike, how: str, reshape1d: bool = True | ||
) -> ArrayLike: | ||
# see if we can cast the values to the desired dtype | ||
# this may not be the original dtype | ||
assert not isinstance(result, DataFrame) | ||
|
||
dtype = maybe_cast_result_dtype(values.dtype, how) | ||
result = maybe_downcast_numeric(result, dtype) | ||
|
||
if isinstance(values, Categorical) and isinstance(result, np.ndarray): | ||
# If the Categorical op didn't raise, it is dtype-preserving | ||
result = type(values)._from_sequence(result.ravel(), dtype=values.dtype) | ||
# Note this will have result.dtype == dtype from above | ||
|
||
elif reshape1d and isinstance(result, np.ndarray) and result.ndim == 1: | ||
# We went through a SeriesGroupByPath and need to reshape | ||
# GH#32223 includes case with IntegerArray values | ||
result = result.reshape(1, -1) | ||
# test_groupby_duplicate_columns gets here with | ||
# result.dtype == int64, values.dtype=object, how="min" | ||
|
||
return result | ||
|
||
|
||
@pin_allowlisted_properties(Series, base.series_apply_allowlist) | ||
class SeriesGroupBy(GroupBy[Series]): | ||
_apply_allowlist = base.series_apply_allowlist | ||
|
@@ -1071,10 +1101,18 @@ def _iterate_slices(self) -> Iterable[Series]: | |
def _cython_agg_general( | ||
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 | ||
) -> DataFrame: | ||
agg_mgr = self._cython_agg_blocks( | ||
how, alt=alt, numeric_only=numeric_only, min_count=min_count | ||
) | ||
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items) | ||
if isinstance(self.obj._mgr, BlockManager): | ||
agg_mgr = self._cython_agg_blocks( | ||
how, alt=alt, numeric_only=numeric_only, min_count=min_count | ||
) | ||
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items) | ||
elif isinstance(self.obj._mgr, ArrayManager): | ||
agg_arrays, columns = self._cython_agg_arrays( | ||
how, alt=alt, numeric_only=numeric_only, min_count=min_count | ||
) | ||
return self._wrap_agged_arrays(agg_arrays, columns) | ||
else: | ||
raise TypeError("Unknown manager type") | ||
|
||
def _cython_agg_blocks( | ||
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 | ||
|
@@ -1085,28 +1123,6 @@ def _cython_agg_blocks( | |
if numeric_only: | ||
data = data.get_numeric_data(copy=False) | ||
|
||
def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike: | ||
# see if we can cast the values to the desired dtype | ||
# this may not be the original dtype | ||
assert not isinstance(result, DataFrame) | ||
|
||
dtype = maybe_cast_result_dtype(values.dtype, how) | ||
result = maybe_downcast_numeric(result, dtype) | ||
|
||
if isinstance(values, Categorical) and isinstance(result, np.ndarray): | ||
# If the Categorical op didn't raise, it is dtype-preserving | ||
result = type(values)._from_sequence(result.ravel(), dtype=values.dtype) | ||
# Note this will have result.dtype == dtype from above | ||
|
||
elif isinstance(result, np.ndarray) and result.ndim == 1: | ||
# We went through a SeriesGroupByPath and need to reshape | ||
# GH#32223 includes case with IntegerArray values | ||
result = result.reshape(1, -1) | ||
# test_groupby_duplicate_columns gets here with | ||
# result.dtype == int64, values.dtype=object, how="min" | ||
|
||
return result | ||
|
||
def py_fallback(bvalues: ArrayLike) -> ArrayLike: | ||
# if self.grouper.aggregate fails, we fall back to a pure-python | ||
# solution | ||
|
@@ -1169,7 +1185,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: | |
|
||
result = py_fallback(bvalues) | ||
|
||
return cast_agg_result(result, bvalues, how) | ||
return _cast_agg_result(result, bvalues, how) | ||
|
||
# TypeError -> we may have an exception in trying to aggregate | ||
# continue and exclude the block | ||
|
@@ -1181,6 +1197,63 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: | |
|
||
return new_mgr | ||
|
||
def _cython_agg_arrays( | ||
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 | ||
) -> Tuple[List[ArrayLike], Index]: | ||
|
||
data: ArrayManager = self._get_data_to_aggregate() | ||
|
||
if numeric_only: | ||
data = data.get_numeric_data(copy=False) | ||
|
||
def py_fallback(bvalues: ArrayLike) -> ArrayLike: | ||
# TODO(ArrayManager) | ||
raise NotImplementedError | ||
|
||
def array_func(values: ArrayLike) -> ArrayLike: | ||
|
||
try: | ||
result = self.grouper._cython_operation( | ||
"aggregate", values, how, axis=1, min_count=min_count | ||
) | ||
except NotImplementedError: | ||
# generally if we have numeric_only=False | ||
# and non-applicable functions | ||
# try to python agg | ||
|
||
if alt is None: | ||
# we cannot perform the operation | ||
# in an alternate way, exclude the block | ||
assert how == "ohlc" | ||
raise | ||
|
||
result = py_fallback(values) | ||
|
||
return _cast_agg_result(result, values, how, reshape1d=False) | ||
jorisvandenbossche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
result_arrays = [array_func(arr) for arr in data.arrays] | ||
|
||
if not len(result_arrays): | ||
raise DataError("No numeric types to aggregate") | ||
|
||
return result_arrays, data.axes[0] | ||
|
||
def _wrap_agged_arrays(self, arrays: List[ArrayLike], columns: Index) -> DataFrame: | ||
jorisvandenbossche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self.as_index: | ||
index = np.arange(arrays[0].shape[0]) | ||
mgr = ArrayManager(arrays, axes=[index, columns]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if/when BlockManager is gone, then |
||
result = self.obj._constructor(mgr) | ||
self._insert_inaxis_grouper_inplace(result) | ||
else: | ||
index = self.grouper.result_index | ||
mgr = ArrayManager(arrays, axes=[index, columns]) | ||
result = self.obj._constructor(mgr) | ||
jorisvandenbossche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if self.axis == 1: | ||
result = result.T | ||
|
||
return self._reindex_output(result)._convert(datetime=True) | ||
|
||
def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame: | ||
if self.grouper.nkeys != 1: | ||
raise AssertionError("Number of keys must be 1") | ||
|
@@ -1663,7 +1736,7 @@ def _wrap_frame_output(self, result, obj: DataFrame) -> DataFrame: | |
else: | ||
return self.obj._constructor(result, index=obj.index, columns=result_index) | ||
|
||
def _get_data_to_aggregate(self) -> BlockManager: | ||
def _get_data_to_aggregate(self) -> Manager: | ||
obj = self._obj_with_exclusions | ||
if self.axis == 1: | ||
return obj.T._mgr | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -330,7 +330,7 @@ def apply_with_block(self: T, f, align_keys=None, **kwargs) -> T: | |
if hasattr(arr, "tz") and arr.tz is None: # type: ignore[union-attr] | ||
# DatetimeArray needs to be converted to ndarray for DatetimeBlock | ||
arr = arr._data # type: ignore[union-attr] | ||
elif arr.dtype.kind == "m": | ||
elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could combine this with previous check as
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be nice, but the problem is that we still need to keep DatetimeArray intact for DatetimeTZBlock. So we would still need the Edit: the diff would be: - if hasattr(arr, "tz") and arr.tz is None: # type: ignore[union-attr]
- # DatetimeArray needs to be converted to ndarray for DatetimeBlock
- arr = arr._data # type: ignore[union-attr]
- elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray):
- # TimedeltaArray needs to be converted to ndarray for TimedeltaBlock
+ if (
+ arr.dtype.kind == "m"
+ and not isinstance(arr, np.ndarray)
+ and getattr(arr, "tz", None) is None
+ ):
+ # DatetimeArray/TimedeltaArray needs to be converted to ndarray
+ # for DatetimeBlock/TimedeltaBlock (except DatetimeArray with tz,
+ # which needs to be preserved for DatetimeTZBlock)
arr = arr._data # type: ignore[union-attr] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That still gives the same length of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yah the only possible difference is for mypy |
||
# TimedeltaArray needs to be converted to ndarray for TimedeltaBlock | ||
arr = arr._data # type: ignore[union-attr] | ||
if isinstance(arr, np.ndarray): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,10 @@ | |
"max", | ||
], | ||
) | ||
def test_cythonized_aggers(op_name): | ||
def test_cythonized_aggers(op_name, using_array_manager): | ||
if using_array_manager and op_name in {"count", "sem"}: | ||
# TODO(ArrayManager) groupby count/sem | ||
pytest.skip("ArrayManager groupby count/sem not yet implemented") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can the add_marker pattern be used here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use the Now, that said, I should maybe actually start using xfail instead of skip for the "skip_array_manager_not_yet_implemented", so it's easier to notice if certain tests can be unskipped when more features get implemented. So will already start using the xfail as you suggest here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, in the meantime I fixed (but a sign that using xfail instead of skip is actually a good idea ;)) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good |
||
data = { | ||
"A": [0, 0, 0, 0, 1, 1, 1, 1, 1, 1.0, np.nan, np.nan], | ||
"B": ["A", "B"] * 6, | ||
|
@@ -273,15 +276,18 @@ def test_cython_with_timestamp_and_nat(op, data): | |
"cummax", | ||
], | ||
) | ||
def test_read_only_buffer_source_agg(agg): | ||
def test_read_only_buffer_source_agg(agg, using_array_manager): | ||
# https://github.com/pandas-dev/pandas/issues/36014 | ||
df = DataFrame( | ||
{ | ||
"sepal_length": [5.1, 4.9, 4.7, 4.6, 5.0], | ||
"species": ["setosa", "setosa", "setosa", "setosa", "setosa"], | ||
} | ||
) | ||
df._mgr.blocks[0].values.flags.writeable = False | ||
if using_array_manager: | ||
df._mgr.arrays[0].flags.writeable = False | ||
else: | ||
df._mgr.blocks[0].values.flags.writeable = False | ||
|
||
result = df.groupby(["species"]).agg({"sepal_length": agg}) | ||
expected = df.copy().groupby(["species"]).agg({"sepal_length": agg}) | ||
|
Uh oh!
There was an error while loading. Please reload this page.