diff --git a/doc/source/whatsnew/v2.1.0.rst b/doc/source/whatsnew/v2.1.0.rst index c50e031c815a6..b869859e0ca80 100644 --- a/doc/source/whatsnew/v2.1.0.rst +++ b/doc/source/whatsnew/v2.1.0.rst @@ -669,6 +669,7 @@ I/O ^^^ - :meth:`DataFrame.to_orc` now raising ``ValueError`` when non-default :class:`Index` is given (:issue:`51828`) - :meth:`DataFrame.to_sql` now raising ``ValueError`` when the name param is left empty while using SQLAlchemy to connect (:issue:`52675`) +- Added ``filters`` parameter to :func:`read_parquet` to filter out data, compatible with both ``engines`` (:issue:`53212`) - Bug in :func:`json_normalize`, fix json_normalize cannot parse metadata fields list type (:issue:`37782`) - Bug in :func:`read_csv` where it would error when ``parse_dates`` was set to a list or dictionary with ``engine="pyarrow"`` (:issue:`47961`) - Bug in :func:`read_csv`, with ``engine="pyarrow"`` erroring when specifying a ``dtype`` with ``index_col`` (:issue:`53229`) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 61112542fb9d8..90d59b0dfcfc8 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -228,6 +228,7 @@ def read( self, path, columns=None, + filters=None, use_nullable_dtypes: bool = False, dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, storage_options: StorageOptions | None = None, @@ -257,7 +258,11 @@ def read( ) try: pa_table = self.api.parquet.read_table( - path_or_handle, columns=columns, filesystem=filesystem, **kwargs + path_or_handle, + columns=columns, + filesystem=filesystem, + filters=filters, + **kwargs, ) result = pa_table.to_pandas(**to_pandas_kwargs) @@ -335,6 +340,7 @@ def read( self, path, columns=None, + filters=None, storage_options: StorageOptions | None = None, filesystem=None, **kwargs, @@ -375,7 +381,7 @@ def read( try: parquet_file = self.api.ParquetFile(path, **parquet_kwargs) - return parquet_file.to_pandas(columns=columns, **kwargs) + return parquet_file.to_pandas(columns=columns, filters=filters, **kwargs) finally: if handles is not None: handles.close() @@ -487,6 +493,7 @@ def read_parquet( use_nullable_dtypes: bool | lib.NoDefault = lib.no_default, dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, filesystem: Any = None, + filters: list[tuple] | list[list[tuple]] | None = None, **kwargs, ) -> DataFrame: """ @@ -517,7 +524,6 @@ def read_parquet( if you wish to use its implementation. columns : list, default=None If not None, only these columns will be read from the file. - {storage_options} .. versionadded:: 1.3.0 @@ -550,6 +556,24 @@ def read_parquet( .. versionadded:: 2.1.0 + filters : List[Tuple] or List[List[Tuple]], default None + To filter out data. + Filter syntax: [[(column, op, val), ...],...] + where op is [==, =, >, >=, <, <=, !=, in, not in] + The innermost tuples are transposed into a set of filters applied + through an `AND` operation. + The outer list combines these sets of filters through an `OR` + operation. + A single list of tuples can also be used, meaning that no `OR` + operation between set of filters is to be conducted. + + Using this argument will NOT result in row-wise filtering of the final + partitions unless ``engine="pyarrow"`` is also specified. For + other engines, filtering is only performed at the partition level, that is, + to prevent the loading of some row-groups and/or files. + + .. versionadded:: 2.1.0 + **kwargs Any additional kwargs are passed to the engine. @@ -632,6 +656,7 @@ def read_parquet( return impl.read( path, columns=columns, + filters=filters, storage_options=storage_options, use_nullable_dtypes=use_nullable_dtypes, dtype_backend=dtype_backend, diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index a1bc470bdb7a2..5399cabb61ec3 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -426,6 +426,25 @@ def test_read_columns(self, engine): df, engine, expected=expected, read_kwargs={"columns": ["string"]} ) + def test_read_filters(self, engine, tmp_path): + df = pd.DataFrame( + { + "int": list(range(4)), + "part": list("aabb"), + } + ) + + expected = pd.DataFrame({"int": [0, 1]}) + check_round_trip( + df, + engine, + path=tmp_path, + expected=expected, + write_kwargs={"partition_cols": ["part"]}, + read_kwargs={"filters": [("part", "==", "a")], "columns": ["int"]}, + repeat=1, + ) + def test_write_index(self, engine, using_copy_on_write, request): check_names = engine != "fastparquet" if using_copy_on_write and engine == "fastparquet":