-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
Remove blocks from GroupBy Code #28782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
b694b09
ee85e5a
32d1b6b
6115323
2324c0c
5de118f
e6b5fd1
44a0c6a
9efdab6
654ccf2
2d3c5dd
6c521f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,6 @@ | |
) | ||
from pandas.core.index import Index, MultiIndex, _all_indexes_same | ||
import pandas.core.indexes.base as ibase | ||
from pandas.core.internals import BlockManager, make_block | ||
from pandas.core.series import Series | ||
|
||
from pandas.plotting import boxplot_frame_groupby | ||
|
@@ -147,93 +146,6 @@ def _iterate_slices(self): | |
continue | ||
yield val, slicer(val) | ||
|
||
def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): | ||
new_items, new_blocks = self._cython_agg_blocks( | ||
how, alt=alt, numeric_only=numeric_only, min_count=min_count | ||
) | ||
return self._wrap_agged_blocks(new_items, new_blocks) | ||
|
||
_block_agg_axis = 0 | ||
|
||
def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): | ||
# TODO: the actual managing of mgr_locs is a PITA | ||
# here, it should happen via BlockManager.combine | ||
|
||
data, agg_axis = self._get_data_to_aggregate() | ||
|
||
if numeric_only: | ||
data = data.get_numeric_data(copy=False) | ||
|
||
new_blocks = [] | ||
new_items = [] | ||
deleted_items = [] | ||
no_result = object() | ||
for block in data.blocks: | ||
# Avoid inheriting result from earlier in the loop | ||
result = no_result | ||
locs = block.mgr_locs.as_array | ||
try: | ||
result, _ = self.grouper.aggregate( | ||
block.values, how, axis=agg_axis, min_count=min_count | ||
) | ||
except NotImplementedError: | ||
# generally if we have numeric_only=False | ||
# and non-applicable functions | ||
# try to python agg | ||
|
||
if alt is None: | ||
# we cannot perform the operation | ||
# in an alternate way, exclude the block | ||
deleted_items.append(locs) | ||
continue | ||
|
||
# call our grouper again with only this block | ||
obj = self.obj[data.items[locs]] | ||
s = groupby(obj, self.grouper) | ||
try: | ||
result = s.aggregate(lambda x: alt(x, axis=self.axis)) | ||
except TypeError: | ||
# we may have an exception in trying to aggregate | ||
# continue and exclude the block | ||
deleted_items.append(locs) | ||
continue | ||
finally: | ||
if result is not no_result: | ||
# see if we can cast the block back to the original dtype | ||
result = maybe_downcast_numeric(result, block.dtype) | ||
newb = block.make_block(result) | ||
|
||
new_items.append(locs) | ||
new_blocks.append(newb) | ||
|
||
if len(new_blocks) == 0: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
# reset the locs in the blocks to correspond to our | ||
# current ordering | ||
indexer = np.concatenate(new_items) | ||
new_items = data.items.take(np.sort(indexer)) | ||
|
||
if len(deleted_items): | ||
|
||
# we need to adjust the indexer to account for the | ||
# items we have removed | ||
# really should be done in internals :< | ||
|
||
deleted = np.concatenate(deleted_items) | ||
ai = np.arange(len(data)) | ||
mask = np.zeros(len(data)) | ||
mask[deleted] = 1 | ||
indexer = (ai - mask.cumsum())[indexer] | ||
|
||
offset = 0 | ||
for b in new_blocks: | ||
loc = len(b.mgr_locs) | ||
b.mgr_locs = indexer[offset : (offset + loc)] | ||
offset += loc | ||
|
||
return new_items, new_blocks | ||
|
||
def aggregate(self, func, *args, **kwargs): | ||
_level = kwargs.pop("_level", None) | ||
|
||
|
@@ -1385,8 +1297,6 @@ class DataFrameGroupBy(NDFrameGroupBy): | |
|
||
_apply_whitelist = base.dataframe_apply_whitelist | ||
|
||
_block_agg_axis = 1 | ||
|
||
_agg_see_also_doc = dedent( | ||
""" | ||
See Also | ||
|
@@ -1525,13 +1435,6 @@ def _wrap_generic_output(self, result, obj): | |
else: | ||
return DataFrame(result, index=obj.index, columns=result_index) | ||
|
||
def _get_data_to_aggregate(self): | ||
obj = self._obj_with_exclusions | ||
if self.axis == 1: | ||
return obj.T._data, 1 | ||
else: | ||
return obj._data, 1 | ||
|
||
def _insert_inaxis_grouper_inplace(self, result): | ||
# zip in reverse so we can always insert at loc 0 | ||
izip = zip( | ||
|
@@ -1550,18 +1453,25 @@ def _insert_inaxis_grouper_inplace(self, result): | |
result.insert(0, name, lev) | ||
|
||
def _wrap_aggregated_output(self, output, names=None): | ||
agg_axis = 0 if self.axis == 1 else 1 | ||
agg_labels = self._obj_with_exclusions._get_axis(agg_axis) | ||
index = self.grouper.result_index | ||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if isinstance(output, dict): | ||
result = DataFrame(output, index=index) | ||
else: | ||
agg_axis = 0 if self.axis == 1 else 1 | ||
agg_labels = self._obj_with_exclusions._get_axis(agg_axis) | ||
output_keys = self._decide_output_index( | ||
output, index=index, columns=agg_labels | ||
) | ||
result = DataFrame(output, columns=output_keys) | ||
|
||
output_keys = self._decide_output_index(output, agg_labels) | ||
if names: | ||
result.columns = names | ||
|
||
if not self.as_index: | ||
result = DataFrame(output, columns=output_keys) | ||
self._insert_inaxis_grouper_inplace(result) | ||
result = result._consolidate() | ||
else: | ||
index = self.grouper.result_index | ||
result = DataFrame(output, index=index, columns=output_keys) | ||
result = result.reset_index(drop=True) | ||
|
||
if self.axis == 1: | ||
result = result.T | ||
|
@@ -1571,24 +1481,6 @@ def _wrap_aggregated_output(self, output, names=None): | |
def _wrap_transformed_output(self, output, names=None): | ||
return DataFrame(output, index=self.obj.index) | ||
|
||
def _wrap_agged_blocks(self, items, blocks): | ||
if not self.as_index: | ||
index = np.arange(blocks[0].values.shape[-1]) | ||
mgr = BlockManager(blocks, [items, index]) | ||
result = DataFrame(mgr) | ||
|
||
self._insert_inaxis_grouper_inplace(result) | ||
result = result._consolidate() | ||
else: | ||
index = self.grouper.result_index | ||
mgr = BlockManager(blocks, [items, index]) | ||
result = DataFrame(mgr) | ||
|
||
if self.axis == 1: | ||
result = result.T | ||
|
||
return self._reindex_output(result)._convert(datetime=True) | ||
|
||
def _iterate_column_groupbys(self): | ||
for i, colname in enumerate(self._selected_obj.columns): | ||
yield colname, SeriesGroupBy( | ||
|
@@ -1616,20 +1508,27 @@ def count(self): | |
DataFrame | ||
Count of values within each group. | ||
""" | ||
data, _ = self._get_data_to_aggregate() | ||
output = OrderedDict() | ||
names = [] | ||
|
||
# TODO: dispatch to _cython_agg_general instead of custom looping | ||
# TODO: refactor with series logic | ||
ids, _, ngroups = self.grouper.group_info | ||
mask = ids != -1 | ||
|
||
val = ( | ||
(mask & ~_isna_ndarraylike(np.atleast_2d(blk.get_values()))) | ||
for blk in data.blocks | ||
) | ||
loc = (blk.mgr_locs for blk in data.blocks) | ||
if self.axis == 0: | ||
iter_obj = self._obj_with_exclusions | ||
else: | ||
iter_obj = self._obj_with_exclusions.T | ||
|
||
counter = partial(lib.count_level_2d, labels=ids, max_bin=ngroups, axis=1) | ||
blk = map(make_block, map(counter, val), loc) | ||
for index, (name, obj) in enumerate(iter_obj.items()): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the ideal way to do this, but to keep the scope of the PR minimum and get tests passing I put this iteration directly in this method. This really should be shared by a few aggregation functions, but I believe is currently blocked by #25610 and #21668, amongst possibly others Would prefer to address comprehensively in a follow up |
||
mask = (ids != -1) & ~isna(obj) | ||
ids = ensure_platform_int(ids) | ||
minlength = ngroups or 0 | ||
out = np.bincount(ids[mask], minlength=minlength) | ||
output[index] = out | ||
names.append(name) | ||
|
||
return self._wrap_agged_blocks(data.items, list(blk)) | ||
return self._wrap_aggregated_output(output, names=names) | ||
|
||
def nunique(self, dropna=True): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -692,7 +692,7 @@ def __iter__(self): | |
Generator yielding sequence of (name, subsetted object) | ||
for each group | ||
""" | ||
return self.grouper.get_iterator(self.obj, axis=self.axis) | ||
return self.grouper.get_iterator(self._selected_obj, axis=self.axis) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was rather surprising that I had to change here, but getting rid of the block code seemed to change the behavior of the following snippet: In [14]: df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["g", "a", "a"])
In [15]: df.groupby("g").transform("first")
Out[15]:
a a
0 2 3
1 5 6 Without referencing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is surprising. Is it possible that it is wrong now but untested? (i.e. could be fixed independently) |
||
|
||
@Appender( | ||
_apply_docs["template"].format( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -300,7 +300,7 @@ def test_observed(observed): | |
exp_index = CategoricalIndex( | ||
list("ab"), name="cat", categories=list("abc"), ordered=True | ||
) | ||
expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index) | ||
expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was coercing to float with the block code but I don't think that was necessarily desired; fits in an int with new impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though we might not want this either. Here is code to reproduce: In [8]: d = {
...: "ints": [1, 1, 2, 2],
...: "val": [10, 20, 30, 40],
...: }
...: df = pd.DataFrame(d)
In [8]: df.groupby(list("abab")).mean()
Out[7]:
ints val
a 1.5 20
b 1.5 30 A typical call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My intuition is that small changes like this are inevitable when changing from block-wise to column-wise. Definitely happened with arithmetic changeover. |
||
if not observed: | ||
index = CategoricalIndex( | ||
list("abc"), name="cat", categories=list("abc"), ordered=True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
troubleshooting this chunk of code may be irrelevant if its getting ripped out, but FWIW: there are cases, particularly via L194, where we get here with
result
that is a DataFrame instead of an ndarray/EA. In those cases, the make_block call below raises.