Skip to content

REF: de-duplicate Manager concat paths #52771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 34 additions & 54 deletions pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@


def _concatenate_array_managers(
mgrs_indexers, axes: list[Index], concat_axis: AxisInt, copy: bool
mgrs: list[Manager], axes: list[Index], concat_axis: AxisInt
) -> Manager:
"""
Concatenate array managers into one.
Expand All @@ -82,27 +82,11 @@ def _concatenate_array_managers(
mgrs_indexers : list of (ArrayManager, {axis: indexer,...}) tuples
axes : list of Index
concat_axis : int
copy : bool

Returns
-------
ArrayManager
"""
# reindex all arrays
mgrs = []
for mgr, indexers in mgrs_indexers:
axis1_made_copy = False
for ax, indexer in indexers.items():
mgr = mgr.reindex_indexer(
axes[ax], indexer, axis=ax, allow_dups=True, use_na_proxy=True
)
if ax == 1 and indexer is not None:
axis1_made_copy = True
if copy and concat_axis == 0 and not axis1_made_copy:
# for concat_axis 1 we will always get a copy through concat_arrays
mgr = mgr.copy()
mgrs.append(mgr)

if concat_axis == 1:
# concatting along the rows -> concat the reindexed arrays
# TODO(ArrayManager) doesn't yet preserve the correct dtype
Expand Down Expand Up @@ -192,9 +176,18 @@ def concatenate_managers(
-------
BlockManager
"""

needs_copy = copy and concat_axis == 0

# TODO(ArrayManager) this assumes that all managers are of the same type
if isinstance(mgrs_indexers[0][0], ArrayManager):
return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy)
mgrs = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers, needs_copy)
# error: Argument 1 to "_concatenate_array_managers" has incompatible
# type "List[BlockManager]"; expected "List[Union[ArrayManager,
# SingleArrayManager, BlockManager, SingleBlockManager]]"
return _concatenate_array_managers(
mgrs, axes, concat_axis # type: ignore[arg-type]
)

# Assertions disabled for performance
# for tup in mgrs_indexers:
Expand All @@ -203,7 +196,8 @@ def concatenate_managers(
# assert concat_axis not in indexers

if concat_axis == 0:
return _concat_managers_axis0(mgrs_indexers, axes, copy)
mgrs = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers, needs_copy)
return _concat_managers_axis0(mgrs, axes)

if len(mgrs_indexers) > 0 and mgrs_indexers[0][0].nblocks > 0:
first_dtype = mgrs_indexers[0][0].blocks[0].dtype
Expand All @@ -220,19 +214,15 @@ def concatenate_managers(
nb = _concat_homogeneous_fastpath(mgrs_indexers, shape, first_dtype)
return BlockManager((nb,), axes)

mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)
if len(mgrs_indexers) == 1:
mgr, indexers = mgrs_indexers[0]
# Assertion correct but disabled for perf:
# assert not indexers
if copy:
out = mgr.copy(deep=True)
else:
out = mgr.copy(deep=False)
mgrs = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers, needs_copy)

if len(mgrs) == 1:
mgr = mgrs[0]
out = mgr.copy(deep=False)
out.axes = axes
return out

concat_plan = _get_combined_plan([mgr for mgr, _ in mgrs_indexers])
concat_plan = _get_combined_plan(mgrs)

blocks = []
values: ArrayLike
Expand Down Expand Up @@ -277,35 +267,20 @@ def concatenate_managers(
return BlockManager(tuple(blocks), axes)


def _concat_managers_axis0(
mgrs_indexers, axes: list[Index], copy: bool
) -> BlockManager:
def _concat_managers_axis0(mgrs: list[BlockManager], axes: list[Index]) -> BlockManager:
"""
concat_managers specialized to concat_axis=0, with reindexing already
having been done in _maybe_reindex_columns_na_proxy.
"""
had_reindexers = {
i: len(mgrs_indexers[i][1]) > 0 for i in range(len(mgrs_indexers))
}
mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)

mgrs: list[BlockManager] = [x[0] for x in mgrs_indexers]

offset = 0
blocks: list[Block] = []
for i, mgr in enumerate(mgrs):
# If we already reindexed, then we definitely don't need another copy
made_copy = had_reindexers[i]

for blk in mgr.blocks:
if made_copy:
nb = blk.copy(deep=False)
elif copy:
nb = blk.copy()
else:
# by slicing instead of copy(deep=False), we get a new array
# object, see test_concat_copy
nb = blk.getitem_block(slice(None))
# We need to do getitem_block here otherwise we would be altering
# blk.mgr_locs in place, which would render it invalid. This is only
# relevant in the copy=False case.
nb = blk.getitem_block(slice(None))
nb._mgr_locs = nb._mgr_locs.add(offset)
blocks.append(nb)

Expand All @@ -316,16 +291,18 @@ def _concat_managers_axis0(


def _maybe_reindex_columns_na_proxy(
axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]]
) -> list[tuple[BlockManager, dict[int, np.ndarray]]]:
axes: list[Index],
mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]],
needs_copy: bool,
) -> list[BlockManager]:
"""
Reindex along columns so that all of the BlockManagers being concatenated
have matching columns.

Columns added in this reindexing have dtype=np.void, indicating they
should be ignored when choosing a column's final dtype.
"""
new_mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] = []
new_mgrs = []

for mgr, indexers in mgrs_indexers:
# For axis=0 (i.e. columns) we use_na_proxy and only_slice, so this
Expand All @@ -340,8 +317,11 @@ def _maybe_reindex_columns_na_proxy(
allow_dups=True,
use_na_proxy=True, # only relevant for i==0
)
new_mgrs_indexers.append((mgr, {}))
return new_mgrs_indexers
if needs_copy and not indexers:
mgr = mgr.copy()

new_mgrs.append(mgr)
return new_mgrs


def _is_homogeneous_mgr(mgr: BlockManager, first_dtype: DtypeObj) -> bool:
Expand Down
6 changes: 5 additions & 1 deletion pandas/tests/reshape/concat/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ def test_concat_copy(self, using_array_manager, using_copy_on_write):

if not using_copy_on_write:
for arr in result._mgr.arrays:
assert arr.base is None
assert not any(
np.shares_memory(arr, y)
for x in [df, df2, df3]
for y in x._mgr.arrays
)
else:
for arr in result._mgr.arrays:
assert arr.base is not None
Expand Down