Skip to content

ENH: explicit filters parameter in pd.read_parquet #53212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v2.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ I/O
^^^
- :meth:`DataFrame.to_orc` now raising ``ValueError`` when non-default :class:`Index` is given (:issue:`51828`)
- :meth:`DataFrame.to_sql` now raising ``ValueError`` when the name param is left empty while using SQLAlchemy to connect (:issue:`52675`)
- Added ``filters`` parameter to :func:`read_parquet` to filter out data, compatible with both ``engines`` (:issue:`53212`)
- Bug in :func:`json_normalize`, fix json_normalize cannot parse metadata fields list type (:issue:`37782`)
- Bug in :func:`read_csv` where it would error when ``parse_dates`` was set to a list or dictionary with ``engine="pyarrow"`` (:issue:`47961`)
- Bug in :func:`read_csv`, with ``engine="pyarrow"`` erroring when specifying a ``dtype`` with ``index_col`` (:issue:`53229`)
Expand Down
31 changes: 28 additions & 3 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def read(
self,
path,
columns=None,
filters=None,
use_nullable_dtypes: bool = False,
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
storage_options: StorageOptions | None = None,
Expand Down Expand Up @@ -257,7 +258,11 @@ def read(
)
try:
pa_table = self.api.parquet.read_table(
path_or_handle, columns=columns, filesystem=filesystem, **kwargs
path_or_handle,
columns=columns,
filesystem=filesystem,
filters=filters,
**kwargs,
)
result = pa_table.to_pandas(**to_pandas_kwargs)

Expand Down Expand Up @@ -335,6 +340,7 @@ def read(
self,
path,
columns=None,
filters=None,
storage_options: StorageOptions | None = None,
filesystem=None,
**kwargs,
Expand Down Expand Up @@ -375,7 +381,7 @@ def read(

try:
parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
return parquet_file.to_pandas(columns=columns, **kwargs)
return parquet_file.to_pandas(columns=columns, filters=filters, **kwargs)
finally:
if handles is not None:
handles.close()
Expand Down Expand Up @@ -487,6 +493,7 @@ def read_parquet(
use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
filesystem: Any = None,
filters: list[tuple] | list[list[tuple]] | None = None,
**kwargs,
) -> DataFrame:
"""
Expand Down Expand Up @@ -517,7 +524,6 @@ def read_parquet(
if you wish to use its implementation.
columns : list, default=None
If not None, only these columns will be read from the file.

{storage_options}

.. versionadded:: 1.3.0
Expand Down Expand Up @@ -550,6 +556,24 @@ def read_parquet(

.. versionadded:: 2.1.0

filters : List[Tuple] or List[List[Tuple]], default None
To filter out data.
Filter syntax: [[(column, op, val), ...],...]
where op is [==, =, >, >=, <, <=, !=, in, not in]
The innermost tuples are transposed into a set of filters applied
through an `AND` operation.
The outer list combines these sets of filters through an `OR`
operation.
A single list of tuples can also be used, meaning that no `OR`
operation between set of filters is to be conducted.

Using this argument will NOT result in row-wise filtering of the final
partitions unless ``engine="pyarrow"`` is also specified. For
other engines, filtering is only performed at the partition level, that is,
to prevent the loading of some row-groups and/or files.

.. versionadded:: 2.1.0

**kwargs
Any additional kwargs are passed to the engine.

Expand Down Expand Up @@ -632,6 +656,7 @@ def read_parquet(
return impl.read(
path,
columns=columns,
filters=filters,
storage_options=storage_options,
use_nullable_dtypes=use_nullable_dtypes,
dtype_backend=dtype_backend,
Expand Down
19 changes: 19 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,25 @@ def test_read_columns(self, engine):
df, engine, expected=expected, read_kwargs={"columns": ["string"]}
)

def test_read_filters(self, engine, tmp_path):
df = pd.DataFrame(
{
"int": list(range(4)),
"part": list("aabb"),
}
)

expected = pd.DataFrame({"int": [0, 1]})
check_round_trip(
df,
engine,
path=tmp_path,
expected=expected,
write_kwargs={"partition_cols": ["part"]},
read_kwargs={"filters": [("part", "==", "a")], "columns": ["int"]},
repeat=1,
)

def test_write_index(self, engine, using_copy_on_write, request):
check_names = engine != "fastparquet"
if using_copy_on_write and engine == "fastparquet":
Expand Down