diff --git a/pandas/core/arrays/_mixins.py b/pandas/core/arrays/_mixins.py index b1537fbf2767d..b037f278872a9 100644 --- a/pandas/core/arrays/_mixins.py +++ b/pandas/core/arrays/_mixins.py @@ -4,6 +4,7 @@ from typing import ( TYPE_CHECKING, Any, + Iterator, Literal, Sequence, TypeVar, @@ -28,7 +29,11 @@ npt, type_t, ) -from pandas.compat import pa_version_under2p0 +from pandas.compat import ( + pa_version_under1p01, + pa_version_under2p0, + pa_version_under5p0, +) from pandas.errors import AbstractMethodError from pandas.util._decorators import doc from pandas.util._validators import ( @@ -38,7 +43,10 @@ ) from pandas.core.dtypes.common import ( + is_bool_dtype, is_dtype_equal, + is_integer, + is_scalar, pandas_dtype, ) from pandas.core.dtypes.dtypes import ( @@ -46,7 +54,10 @@ ExtensionDtype, PeriodDtype, ) -from pandas.core.dtypes.missing import array_equivalent +from pandas.core.dtypes.missing import ( + array_equivalent, + isna, +) from pandas.core import missing from pandas.core.algorithms import ( @@ -65,10 +76,11 @@ "NDArrayBackedExtensionArrayT", bound="NDArrayBackedExtensionArray" ) -if TYPE_CHECKING: - +if not pa_version_under1p01: import pyarrow as pa + import pyarrow.compute as pc +if TYPE_CHECKING: from pandas._typing import ( NumpySorter, NumpyValueArrayLike, @@ -607,3 +619,195 @@ def _concat_same_type( chunks = [array for ea in to_concat for array in ea._data.iterchunks()] arr = pa.chunked_array(chunks) return cls(arr) + + def __setitem__(self, key: int | slice | np.ndarray, value: Any) -> None: + """Set one or more values inplace. + + Parameters + ---------- + key : int, ndarray, or slice + When called from, e.g. ``Series.__setitem__``, ``key`` will be + one of + + * scalar int + * ndarray of integers. + * boolean ndarray + * slice object + + value : ExtensionDtype.type, Sequence[ExtensionDtype.type], or object + value or values to be set of ``key``. + + Returns + ------- + None + """ + key = check_array_indexer(self, key) + indices = self._indexing_key_to_indices(key) + value = self._maybe_convert_setitem_value(value) + + argsort = np.argsort(indices) + indices = indices[argsort] + + if is_scalar(value): + value = np.broadcast_to(value, len(self)) + elif len(indices) != len(value): + raise ValueError("Length of indexer and values mismatch") + else: + value = np.asarray(value)[argsort] + + self._data = self._set_via_chunk_iteration(indices=indices, value=value) + + def _indexing_key_to_indices( + self, key: int | slice | np.ndarray + ) -> npt.NDArray[np.intp]: + """ + Convert indexing key for self into positional indices. + + Parameters + ---------- + key : int | slice | np.ndarray + + Returns + ------- + npt.NDArray[np.intp] + """ + n = len(self) + if isinstance(key, slice): + indices = np.arange(n)[key] + elif is_integer(key): + indices = np.arange(n)[[key]] # type: ignore[index] + elif is_bool_dtype(key): + key = np.asarray(key) + if len(key) != n: + raise ValueError("Length of indexer and values mismatch") + indices = key.nonzero()[0] + else: + key = np.asarray(key) + indices = np.arange(n)[key] + return indices + + def _maybe_convert_setitem_value(self, value): + """Maybe convert value to be pyarrow compatible.""" + raise NotImplementedError() + + def _set_via_chunk_iteration( + self, indices: npt.NDArray[np.intp], value: npt.NDArray[Any] + ) -> pa.ChunkedArray: + """ + Loop through the array chunks and set the new values while + leaving the chunking layout unchanged. + """ + chunk_indices = self._indices_to_chunk_indices(indices) + new_data = list(self._data.iterchunks()) + + for i, c_ind in enumerate(chunk_indices): + n = len(c_ind) + if n == 0: + continue + c_value, value = value[:n], value[n:] + new_data[i] = self._replace_with_indices(new_data[i], c_ind, c_value) + + return pa.chunked_array(new_data) + + def _indices_to_chunk_indices( + self, indices: npt.NDArray[np.intp] + ) -> Iterator[npt.NDArray[np.intp]]: + """ + Convert *sorted* indices for self into a list of ndarrays + each containing the indices *within* each chunk of the + underlying ChunkedArray. + + Parameters + ---------- + indices : npt.NDArray[np.intp] + Position indices for the underlying ChunkedArray. + + Returns + ------- + Generator yielding positional indices for each chunk + + Notes + ----- + Assumes that indices is sorted. Caller is responsible for sorting. + """ + for start, stop in self._chunk_positional_ranges(): + if len(indices) == 0 or stop <= indices[0]: + yield np.array([], dtype=np.intp) + else: + n = int(np.searchsorted(indices, stop, side="left")) + c_ind = indices[:n] - start + indices = indices[n:] + yield c_ind + + def _chunk_positional_ranges(self) -> tuple[tuple[int, int], ...]: + """ + Return a tuple of tuples each containing the left (inclusive) + and right (exclusive) positional bounds of each chunk's values + within the underlying ChunkedArray. + + Returns + ------- + tuple[tuple] + """ + ranges = [] + stop = 0 + for c in self._data.iterchunks(): + start, stop = stop, stop + len(c) + ranges.append((start, stop)) + return tuple(ranges) + + @classmethod + def _replace_with_indices( + cls, + chunk: pa.Array, + indices: npt.NDArray[np.intp], + value: npt.NDArray[Any], + ) -> pa.Array: + """ + Replace items selected with a set of positional indices. + + Analogous to pyarrow.compute.replace_with_mask, except that replacement + positions are identified via indices rather than a mask. + + Parameters + ---------- + chunk : pa.Array + indices : npt.NDArray[np.intp] + value : npt.NDArray[Any] + Replacement value(s). + + Returns + ------- + pa.Array + """ + n = len(indices) + + if n == 0: + return chunk + + start, stop = indices[[0, -1]] + + if (stop - start) == (n - 1): + # fast path for a contiguous set of indices + arrays = [ + chunk[:start], + pa.array(value, type=chunk.type), + chunk[stop + 1 :], + ] + arrays = [arr for arr in arrays if len(arr)] + if len(arrays) == 1: + return arrays[0] + return pa.concat_arrays(arrays) + + mask = np.zeros(len(chunk), dtype=np.bool_) + mask[indices] = True + + if pa_version_under5p0: + arr = chunk.to_numpy(zero_copy_only=False) + arr[mask] = value + return pa.array(arr, type=chunk.type) + + if isna(value).all(): + return pc.if_else(mask, None, chunk) + + return pc.replace_with_mask(chunk, mask, value) diff --git a/pandas/core/arrays/string_arrow.py b/pandas/core/arrays/string_arrow.py index 4af4b501fd5b0..ac5bfe32ed5f6 100644 --- a/pandas/core/arrays/string_arrow.py +++ b/pandas/core/arrays/string_arrow.py @@ -30,7 +30,6 @@ pa_version_under2p0, pa_version_under3p0, pa_version_under4p0, - pa_version_under5p0, ) from pandas.util._decorators import doc @@ -343,147 +342,20 @@ def insert(self, loc: int, item): raise TypeError("Scalar must be NA or str") return super().insert(loc, item) - def __setitem__(self, key: int | slice | np.ndarray, value: Any) -> None: - """Set one or more values inplace. - - Parameters - ---------- - key : int, ndarray, or slice - When called from, e.g. ``Series.__setitem__``, ``key`` will be - one of - - * scalar int - * ndarray of integers. - * boolean ndarray - * slice object - - value : ExtensionDtype.type, Sequence[ExtensionDtype.type], or object - value or values to be set of ``key``. - - Returns - ------- - None - """ - key = check_array_indexer(self, key) - indices = self._key_to_indices(key) - + def _maybe_convert_setitem_value(self, value): + """Maybe convert value to be pyarrow compatible.""" if is_scalar(value): if isna(value): value = None elif not isinstance(value, str): raise ValueError("Scalar must be NA or str") - value = np.broadcast_to(value, len(indices)) else: value = np.array(value, dtype=object, copy=True) - for i, v in enumerate(value): - if isna(v): - value[i] = None - elif not isinstance(v, str): + value[isna(value)] = None + for v in value: + if not (v is None or isinstance(v, str)): raise ValueError("Scalar must be NA or str") - - if len(indices) != len(value): - raise ValueError("Length of indexer and values mismatch") - - argsort = np.argsort(indices) - indices = indices[argsort] - value = value[argsort] - - self._data = self._set_via_chunk_iteration(indices=indices, value=value) - - def _key_to_indices(self, key: int | slice | np.ndarray) -> npt.NDArray[np.intp]: - """Convert indexing key for self to positional indices.""" - if isinstance(key, slice): - indices = np.arange(len(self))[key] - elif is_bool_dtype(key): - key = np.asarray(key) - if len(key) != len(self): - raise ValueError("Length of indexer and values mismatch") - indices = key.nonzero()[0] - else: - key_arr = np.array([key]) if is_integer(key) else np.asarray(key) - indices = np.arange(len(self))[key_arr] - return indices - - def _set_via_chunk_iteration( - self, indices: npt.NDArray[np.intp], value: npt.NDArray[Any] - ) -> pa.ChunkedArray: - """ - Loop through the array chunks and set the new values while - leaving the chunking layout unchanged. - """ - - chunk_indices = self._within_chunk_indices(indices) - new_data = [] - - for i, chunk in enumerate(self._data.iterchunks()): - - c_ind = chunk_indices[i] - n = len(c_ind) - c_value, value = value[:n], value[n:] - - if n == 1: - # fast path - chunk = self._set_single_index_in_chunk(chunk, c_ind[0], c_value[0]) - elif n > 0: - mask = np.zeros(len(chunk), dtype=np.bool_) - mask[c_ind] = True - if not pa_version_under5p0: - if c_value is None or isna(np.array(c_value)).all(): - chunk = pc.if_else(mask, None, chunk) - else: - chunk = pc.replace_with_mask(chunk, mask, c_value) - else: - # The pyarrow compute functions were added in - # version 5.0. For prior versions we implement - # our own by converting to numpy and back. - chunk = chunk.to_numpy(zero_copy_only=False) - chunk[mask] = c_value - chunk = pa.array(chunk, type=pa.string()) - - new_data.append(chunk) - - return pa.chunked_array(new_data) - - @staticmethod - def _set_single_index_in_chunk(chunk: pa.Array, index: int, value: Any) -> pa.Array: - """Set a single position in a pyarrow array.""" - assert is_scalar(value) - return pa.concat_arrays( - [ - chunk[:index], - pa.array([value], type=pa.string()), - chunk[index + 1 :], - ] - ) - - def _within_chunk_indices( - self, indices: npt.NDArray[np.intp] - ) -> list[npt.NDArray[np.intp]]: - """ - Convert indices for self into a list of ndarrays each containing - the indices *within* each chunk of the chunked array. - """ - # indices must be sorted - chunk_indices = [] - for start, stop in self._chunk_ranges(): - if len(indices) == 0 or indices[0] >= stop: - c_ind = np.array([], dtype=np.intp) - else: - n = int(np.searchsorted(indices, stop, side="left")) - c_ind = indices[:n] - start - indices = indices[n:] - chunk_indices.append(c_ind) - return chunk_indices - - def _chunk_ranges(self) -> list[tuple]: - """ - Return a list of tuples each containing the left (inclusive) - and right (exclusive) bounds of each chunk. - """ - lengths = [len(c) for c in self._data.iterchunks()] - stops = np.cumsum(lengths) - starts = np.concatenate([[0], stops[:-1]]) - return list(zip(starts, stops)) + return value def take( self,