From 0b4bc173e4f016d4872ba58ad4cbb33d02b560ff Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Sat, 19 Mar 2022 23:51:33 -0400 Subject: [PATCH 1/4] move ArrowStringArray.__setitem__ and related to ArrowExtensionArray --- pandas/core/arrays/_mixins.py | 219 ++++++++++++++++++++++++++++- pandas/core/arrays/string_arrow.py | 140 +----------------- 2 files changed, 221 insertions(+), 138 deletions(-) diff --git a/pandas/core/arrays/_mixins.py b/pandas/core/arrays/_mixins.py index b1537fbf2767d..127cb71c70b56 100644 --- a/pandas/core/arrays/_mixins.py +++ b/pandas/core/arrays/_mixins.py @@ -21,6 +21,7 @@ F, PositionalIndexer2D, PositionalIndexerTuple, + Scalar, ScalarIndexer, SequenceIndexer, Shape, @@ -28,7 +29,11 @@ npt, type_t, ) -from pandas.compat import pa_version_under2p0 +from pandas.compat import ( + pa_version_under1p01, + pa_version_under2p0, + pa_version_under5p0, +) from pandas.errors import AbstractMethodError from pandas.util._decorators import doc from pandas.util._validators import ( @@ -38,7 +43,10 @@ ) from pandas.core.dtypes.common import ( + is_bool_dtype, is_dtype_equal, + is_integer, + is_scalar, pandas_dtype, ) from pandas.core.dtypes.dtypes import ( @@ -46,7 +54,10 @@ ExtensionDtype, PeriodDtype, ) -from pandas.core.dtypes.missing import array_equivalent +from pandas.core.dtypes.missing import ( + array_equivalent, + isna, +) from pandas.core import missing from pandas.core.algorithms import ( @@ -65,10 +76,11 @@ "NDArrayBackedExtensionArrayT", bound="NDArrayBackedExtensionArray" ) -if TYPE_CHECKING: - +if not pa_version_under1p01: import pyarrow as pa + import pyarrow.compute as pc +if TYPE_CHECKING: from pandas._typing import ( NumpySorter, NumpyValueArrayLike, @@ -607,3 +619,202 @@ def _concat_same_type( chunks = [array for ea in to_concat for array in ea._data.iterchunks()] arr = pa.chunked_array(chunks) return cls(arr) + + def __setitem__(self, key: int | slice | np.ndarray, value: Any) -> None: + """Set one or more values inplace. + + Parameters + ---------- + key : int, ndarray, or slice + When called from, e.g. ``Series.__setitem__``, ``key`` will be + one of + + * scalar int + * ndarray of integers. + * boolean ndarray + * slice object + + value : ExtensionDtype.type, Sequence[ExtensionDtype.type], or object + value or values to be set of ``key``. + + Returns + ------- + None + """ + key = check_array_indexer(self, key) + indices = self._indexing_key_to_indices(key) + value = self._maybe_convert_setitem_value(value) + + argsort = np.argsort(indices) + indices = indices[argsort] + + if is_scalar(value): + pass + elif len(indices) != len(value): + raise ValueError("Length of indexer and values mismatch") + else: + value = np.asarray(value)[argsort] + + self._data = self._set_via_chunk_iteration(indices=indices, value=value) + + def _indexing_key_to_indices( + self, key: int | slice | np.ndarray + ) -> npt.NDArray[np.intp]: + """ + Convert indexing key for self into positional indices. + + Parameters + ---------- + key : int | slice | np.ndarray + + Returns + ------- + npt.NDArray[np.intp] + """ + n = len(self) + if isinstance(key, slice): + indices = np.arange(n)[key] + elif is_integer(key): + indices = np.arange(n)[[key]] + elif is_bool_dtype(key): + key = np.asarray(key) + if len(key) != n: + raise ValueError("Length of indexer and values mismatch") + indices = key.nonzero()[0] + else: + key = np.asarray(key) + indices = np.arange(n)[key] + return indices + + def _maybe_convert_setitem_value(self, value): + """Maybe convert value to be pyarrow compatible.""" + raise NotImplementedError() + + def _set_via_chunk_iteration( + self, indices: npt.NDArray[np.intp], value: Scalar | npt.NDArray[Any] + ) -> pa.ChunkedArray: + """ + Loop through the array chunks and set the new values while + leaving the chunking layout unchanged. + """ + chunk_indices = self._indices_to_chunk_indices(indices) + new_data = list(self._data.iterchunks()) + + for i, c_ind in enumerate(chunk_indices): + n = len(c_ind) + if n == 0: + continue + if is_scalar(value): + c_value = value + else: + c_value, value = value[:n], value[n:] + new_data[i] = self._replace_with_indices(new_data[i], c_ind, c_value) + + return pa.chunked_array(new_data) + + def _indices_to_chunk_indices( + self, indices: npt.NDArray[np.intp] + ) -> list[npt.NDArray[np.intp]]: + """ + Convert *sorted* indices for self into a list of ndarrays + each containing the indices *within* each chunk of the + underlying ChunkedArray. + + Parameters + ---------- + indices : npt.NDArray[np.intp] + Position indices for the underlying ChunkedArray. + + Returns + ------- + list[npt.NDArray[np.intp]] + + Notes + ----- + Assumes that indices is sorted. Caller is responsible for sorting. + """ + chunk_indices = [] + for start, stop in self._chunk_positional_ranges(): + if len(indices) == 0 or stop <= indices[0]: + c_ind = np.array([], dtype=np.intp) + else: + n = int(np.searchsorted(indices, stop, side="left")) + c_ind = indices[:n] - start + indices = indices[n:] + chunk_indices.append(c_ind) + return chunk_indices + + def _chunk_positional_ranges(self) -> tuple[tuple[int, int], ...]: + """ + Return a tuple of tuples each containing the left (inclusive) + and right (exclusive) positional bounds of each chunk's values + within the underlying ChunkedArray. + + Returns + ------- + tuple[tuple] + """ + ranges = [] + stop = 0 + for c in self._data.iterchunks(): + start, stop = stop, stop + len(c) + ranges.append((start, stop)) + return tuple(ranges) + + @classmethod + def _replace_with_indices( + cls, + chunk: pa.Array, + indices: npt.NDArray[np.intp], + value: Scalar | npt.NDArray[Any], + ) -> pa.Array: + """ + Replace items selected with a set of positional indices. + + Analogous to pyarrow.compute.replace_with_mask, except that replacement + positions are identified via indices rather than a mask. + + Parameters + ---------- + chunk : pa.Array + indices : npt.NDArray[np.intp] + value : Scalar | npt.NDArray[Any] + Replacement value(s). + + Returns + ------- + pa.Array + """ + n = len(indices) + + if n == 0: + return chunk + + start, stop = indices[[0, -1]] + + if (stop - start) == (n - 1): + # fast path for a contiguous set of indices + if is_scalar(value): + value = np.broadcast_to(value, len(indices)) + arrays = [ + chunk[:start], + pa.array(value, type=chunk.type), + chunk[stop + 1 :], + ] + arrays = [arr for arr in arrays if len(arr)] + if len(arrays) == 1: + return arrays[0] + return pa.concat_arrays(arrays) + + mask = np.zeros(len(chunk), dtype=np.bool_) + mask[indices] = True + + if pa_version_under5p0: + arr = chunk.to_numpy(zero_copy_only=False) + arr[mask] = value + return pa.array(arr, type=chunk.type) + + if value is None or (not is_scalar(value) and isna(value).all()): + return pc.if_else(mask, None, chunk) + + return pc.replace_with_mask(chunk, mask, value) diff --git a/pandas/core/arrays/string_arrow.py b/pandas/core/arrays/string_arrow.py index 4af4b501fd5b0..ac5bfe32ed5f6 100644 --- a/pandas/core/arrays/string_arrow.py +++ b/pandas/core/arrays/string_arrow.py @@ -30,7 +30,6 @@ pa_version_under2p0, pa_version_under3p0, pa_version_under4p0, - pa_version_under5p0, ) from pandas.util._decorators import doc @@ -343,147 +342,20 @@ def insert(self, loc: int, item): raise TypeError("Scalar must be NA or str") return super().insert(loc, item) - def __setitem__(self, key: int | slice | np.ndarray, value: Any) -> None: - """Set one or more values inplace. - - Parameters - ---------- - key : int, ndarray, or slice - When called from, e.g. ``Series.__setitem__``, ``key`` will be - one of - - * scalar int - * ndarray of integers. - * boolean ndarray - * slice object - - value : ExtensionDtype.type, Sequence[ExtensionDtype.type], or object - value or values to be set of ``key``. - - Returns - ------- - None - """ - key = check_array_indexer(self, key) - indices = self._key_to_indices(key) - + def _maybe_convert_setitem_value(self, value): + """Maybe convert value to be pyarrow compatible.""" if is_scalar(value): if isna(value): value = None elif not isinstance(value, str): raise ValueError("Scalar must be NA or str") - value = np.broadcast_to(value, len(indices)) else: value = np.array(value, dtype=object, copy=True) - for i, v in enumerate(value): - if isna(v): - value[i] = None - elif not isinstance(v, str): + value[isna(value)] = None + for v in value: + if not (v is None or isinstance(v, str)): raise ValueError("Scalar must be NA or str") - - if len(indices) != len(value): - raise ValueError("Length of indexer and values mismatch") - - argsort = np.argsort(indices) - indices = indices[argsort] - value = value[argsort] - - self._data = self._set_via_chunk_iteration(indices=indices, value=value) - - def _key_to_indices(self, key: int | slice | np.ndarray) -> npt.NDArray[np.intp]: - """Convert indexing key for self to positional indices.""" - if isinstance(key, slice): - indices = np.arange(len(self))[key] - elif is_bool_dtype(key): - key = np.asarray(key) - if len(key) != len(self): - raise ValueError("Length of indexer and values mismatch") - indices = key.nonzero()[0] - else: - key_arr = np.array([key]) if is_integer(key) else np.asarray(key) - indices = np.arange(len(self))[key_arr] - return indices - - def _set_via_chunk_iteration( - self, indices: npt.NDArray[np.intp], value: npt.NDArray[Any] - ) -> pa.ChunkedArray: - """ - Loop through the array chunks and set the new values while - leaving the chunking layout unchanged. - """ - - chunk_indices = self._within_chunk_indices(indices) - new_data = [] - - for i, chunk in enumerate(self._data.iterchunks()): - - c_ind = chunk_indices[i] - n = len(c_ind) - c_value, value = value[:n], value[n:] - - if n == 1: - # fast path - chunk = self._set_single_index_in_chunk(chunk, c_ind[0], c_value[0]) - elif n > 0: - mask = np.zeros(len(chunk), dtype=np.bool_) - mask[c_ind] = True - if not pa_version_under5p0: - if c_value is None or isna(np.array(c_value)).all(): - chunk = pc.if_else(mask, None, chunk) - else: - chunk = pc.replace_with_mask(chunk, mask, c_value) - else: - # The pyarrow compute functions were added in - # version 5.0. For prior versions we implement - # our own by converting to numpy and back. - chunk = chunk.to_numpy(zero_copy_only=False) - chunk[mask] = c_value - chunk = pa.array(chunk, type=pa.string()) - - new_data.append(chunk) - - return pa.chunked_array(new_data) - - @staticmethod - def _set_single_index_in_chunk(chunk: pa.Array, index: int, value: Any) -> pa.Array: - """Set a single position in a pyarrow array.""" - assert is_scalar(value) - return pa.concat_arrays( - [ - chunk[:index], - pa.array([value], type=pa.string()), - chunk[index + 1 :], - ] - ) - - def _within_chunk_indices( - self, indices: npt.NDArray[np.intp] - ) -> list[npt.NDArray[np.intp]]: - """ - Convert indices for self into a list of ndarrays each containing - the indices *within* each chunk of the chunked array. - """ - # indices must be sorted - chunk_indices = [] - for start, stop in self._chunk_ranges(): - if len(indices) == 0 or indices[0] >= stop: - c_ind = np.array([], dtype=np.intp) - else: - n = int(np.searchsorted(indices, stop, side="left")) - c_ind = indices[:n] - start - indices = indices[n:] - chunk_indices.append(c_ind) - return chunk_indices - - def _chunk_ranges(self) -> list[tuple]: - """ - Return a list of tuples each containing the left (inclusive) - and right (exclusive) bounds of each chunk. - """ - lengths = [len(c) for c in self._data.iterchunks()] - stops = np.cumsum(lengths) - starts = np.concatenate([[0], stops[:-1]]) - return list(zip(starts, stops)) + return value def take( self, From 9834f6589f92ca89d1384516b5bd1c976aa60c85 Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Sun, 20 Mar 2022 07:40:56 -0400 Subject: [PATCH 2/4] use generator --- pandas/core/arrays/_mixins.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pandas/core/arrays/_mixins.py b/pandas/core/arrays/_mixins.py index 127cb71c70b56..fdae9cdf9911d 100644 --- a/pandas/core/arrays/_mixins.py +++ b/pandas/core/arrays/_mixins.py @@ -4,6 +4,7 @@ from typing import ( TYPE_CHECKING, Any, + Iterator, Literal, Sequence, TypeVar, @@ -707,14 +708,14 @@ def _set_via_chunk_iteration( if is_scalar(value): c_value = value else: - c_value, value = value[:n], value[n:] + c_value, value = value[:n], value[n:] # type: ignore[index] new_data[i] = self._replace_with_indices(new_data[i], c_ind, c_value) return pa.chunked_array(new_data) def _indices_to_chunk_indices( self, indices: npt.NDArray[np.intp] - ) -> list[npt.NDArray[np.intp]]: + ) -> Iterator[npt.NDArray[np.intp]]: """ Convert *sorted* indices for self into a list of ndarrays each containing the indices *within* each chunk of the @@ -727,22 +728,20 @@ def _indices_to_chunk_indices( Returns ------- - list[npt.NDArray[np.intp]] + Generator yielding positional indices for each chunk Notes ----- Assumes that indices is sorted. Caller is responsible for sorting. """ - chunk_indices = [] for start, stop in self._chunk_positional_ranges(): if len(indices) == 0 or stop <= indices[0]: - c_ind = np.array([], dtype=np.intp) + yield np.array([], dtype=np.intp) else: n = int(np.searchsorted(indices, stop, side="left")) c_ind = indices[:n] - start indices = indices[n:] - chunk_indices.append(c_ind) - return chunk_indices + yield c_ind def _chunk_positional_ranges(self) -> tuple[tuple[int, int], ...]: """ @@ -814,7 +813,7 @@ def _replace_with_indices( arr[mask] = value return pa.array(arr, type=chunk.type) - if value is None or (not is_scalar(value) and isna(value).all()): + if value is None or (not is_scalar(value) and np.all(isna(value))): return pc.if_else(mask, None, chunk) return pc.replace_with_mask(chunk, mask, value) From 5638d1a3b0f557ba81e91014e412a32e112881b9 Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Sun, 20 Mar 2022 08:46:50 -0400 Subject: [PATCH 3/4] mypy --- pandas/core/arrays/_mixins.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pandas/core/arrays/_mixins.py b/pandas/core/arrays/_mixins.py index fdae9cdf9911d..7429b6fe25550 100644 --- a/pandas/core/arrays/_mixins.py +++ b/pandas/core/arrays/_mixins.py @@ -22,7 +22,6 @@ F, PositionalIndexer2D, PositionalIndexerTuple, - Scalar, ScalarIndexer, SequenceIndexer, Shape, @@ -650,7 +649,7 @@ def __setitem__(self, key: int | slice | np.ndarray, value: Any) -> None: indices = indices[argsort] if is_scalar(value): - pass + value = np.broadcast_to(value, len(self)) elif len(indices) != len(value): raise ValueError("Length of indexer and values mismatch") else: @@ -692,7 +691,7 @@ def _maybe_convert_setitem_value(self, value): raise NotImplementedError() def _set_via_chunk_iteration( - self, indices: npt.NDArray[np.intp], value: Scalar | npt.NDArray[Any] + self, indices: npt.NDArray[np.intp], value: npt.NDArray[Any] ) -> pa.ChunkedArray: """ Loop through the array chunks and set the new values while @@ -705,10 +704,7 @@ def _set_via_chunk_iteration( n = len(c_ind) if n == 0: continue - if is_scalar(value): - c_value = value - else: - c_value, value = value[:n], value[n:] # type: ignore[index] + c_value, value = value[:n], value[n:] new_data[i] = self._replace_with_indices(new_data[i], c_ind, c_value) return pa.chunked_array(new_data) @@ -765,7 +761,7 @@ def _replace_with_indices( cls, chunk: pa.Array, indices: npt.NDArray[np.intp], - value: Scalar | npt.NDArray[Any], + value: npt.NDArray[Any], ) -> pa.Array: """ Replace items selected with a set of positional indices. @@ -777,7 +773,7 @@ def _replace_with_indices( ---------- chunk : pa.Array indices : npt.NDArray[np.intp] - value : Scalar | npt.NDArray[Any] + value : npt.NDArray[Any] Replacement value(s). Returns @@ -793,8 +789,6 @@ def _replace_with_indices( if (stop - start) == (n - 1): # fast path for a contiguous set of indices - if is_scalar(value): - value = np.broadcast_to(value, len(indices)) arrays = [ chunk[:start], pa.array(value, type=chunk.type), @@ -813,7 +807,7 @@ def _replace_with_indices( arr[mask] = value return pa.array(arr, type=chunk.type) - if value is None or (not is_scalar(value) and np.all(isna(value))): + if isna(value).all(): return pc.if_else(mask, None, chunk) return pc.replace_with_mask(chunk, mask, value) From db33bea9eed68c046bbc96cfbe3c635178ff8dd4 Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Sun, 20 Mar 2022 09:43:06 -0400 Subject: [PATCH 4/4] mypy --- pandas/core/arrays/_mixins.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/arrays/_mixins.py b/pandas/core/arrays/_mixins.py index 7429b6fe25550..b037f278872a9 100644 --- a/pandas/core/arrays/_mixins.py +++ b/pandas/core/arrays/_mixins.py @@ -675,7 +675,7 @@ def _indexing_key_to_indices( if isinstance(key, slice): indices = np.arange(n)[key] elif is_integer(key): - indices = np.arange(n)[[key]] + indices = np.arange(n)[[key]] # type: ignore[index] elif is_bool_dtype(key): key = np.asarray(key) if len(key) != n: