Skip to content

ENH: use native filesystem (if available) for read_parquet with pyarrow engine #51609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
89e6f9a
ENH: Add filesystem to read_parquet/to_parquet
mroeschke Feb 24, 2023
06a2b18
Add to to_parquet
mroeschke Feb 24, 2023
35d8cbb
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Feb 24, 2023
0b07437
Bump fsspec
mroeschke Feb 24, 2023
f57a195
fix import
mroeschke Feb 24, 2023
6503a7e
Mock gcs to local for parquet
mroeschke Feb 24, 2023
16a3da2
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Feb 24, 2023
dd27604
Fix condidition, add whatsnew
mroeschke Feb 24, 2023
4ce7da8
address tests, bump gcsfs
mroeschke Feb 24, 2023
0b3bd7d
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Feb 24, 2023
e1f8912
bump s3fs
mroeschke Feb 24, 2023
49f2fc8
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Feb 25, 2023
c5166b5
Fix doc issues
mroeschke Feb 25, 2023
7ec7d75
Try without fsspec wrapper
mroeschke Feb 25, 2023
1b9ab77
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Feb 25, 2023
4ce52cd
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 8, 2023
553105c
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 10, 2023
c1161d3
Revert "Try without fsspec wrapper"
mroeschke Mar 10, 2023
ef7095a
Returns a tuple
mroeschke Mar 10, 2023
e6a61a9
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 11, 2023
08f3a30
Don't wrap in fsspec, undo deps bump
mroeschke Mar 11, 2023
7703f0f
Fix whatsnew
mroeschke Mar 11, 2023
e99d234
Add validations for filesystem
mroeschke Mar 11, 2023
f4ef416
Validate that mock filesystem is used
mroeschke Mar 11, 2023
3f0e751
Undo install.rst
mroeschke Mar 11, 2023
970a08f
Try this
mroeschke Mar 11, 2023
d79f29f
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 11, 2023
29bce3b
Make global again?
mroeschke Mar 11, 2023
440eeac
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 13, 2023
0017ae5
Try this
mroeschke Mar 13, 2023
4d3aff9
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 13, 2023
f6ac7ca
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 13, 2023
31593f0
Address review
mroeschke Mar 13, 2023
7f5dad3
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 13, 2023
646dbad
Fix test
mroeschke Mar 13, 2023
70f1e6c
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 13, 2023
96ef2fb
Use localfilesystem correctly
mroeschke Mar 13, 2023
42e614d
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 14, 2023
ae77a26
use absolute
mroeschke Mar 14, 2023
91c9b93
Merge remote-tracking branch 'upstream/main' into enh/parquet/arrow_fs
mroeschke Mar 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/source/whatsnew/v2.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ Performance improvements
- Performance improvement in :meth:`DataFrame.clip` and :meth:`Series.clip` (:issue:`51472`)
- Performance improvement in :meth:`DataFrame.first_valid_index` and :meth:`DataFrame.last_valid_index` for extension array dtypes (:issue:`51549`)
- Performance improvement in :meth:`DataFrame.where` when ``cond`` is backed by an extension dtype (:issue:`51574`)
- Performance improvement in :meth:`read_orc` when reading a remote URI file path. (:issue:`51609`)
- Performance improvement in :func:`read_orc` when reading a remote URI file path. (:issue:`51609`)
- Performance improvement in :func:`read_parquet` and :meth:`DataFrame.to_parquet` when reading a remote file with ``engine="pyarrow"`` (:issue:`51609`)
- Performance improvement in :meth:`MultiIndex.sortlevel` when ``ascending`` is a list (:issue:`51612`)
- Performance improvement in :meth:`~arrays.ArrowExtensionArray.isna` when array has zero nulls or is all nulls (:issue:`51630`)
- Performance improvement when parsing strings to ``boolean[pyarrow]`` dtype (:issue:`51730`)
Expand Down
99 changes: 88 additions & 11 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,39 @@ def _get_path_or_handle(
]:
"""File handling for PyArrow."""
path_or_handle = stringify_path(path)
if fs is not None:
pa_fs = import_optional_dependency("pyarrow.fs", errors="ignore")
fsspec = import_optional_dependency("fsspec", errors="ignore")
if pa_fs is None and fsspec is None:
raise ValueError(
f"filesystem must be a pyarrow or fsspec FileSystem, "
f"not a {type(fs).__name__}"
)
elif (pa_fs is not None and not isinstance(fs, pa_fs.FileSystem)) and (
fsspec is not None and not isinstance(fs, fsspec.spec.AbstractFileSystem)
):
raise ValueError(
f"filesystem must be a pyarrow or fsspec FileSystem, "
f"not a {type(fs).__name__}"
)
elif pa_fs is not None and isinstance(fs, pa_fs.FileSystem) and storage_options:
raise NotImplementedError(
"storage_options not supported with a pyarrow FileSystem."
)
if is_fsspec_url(path_or_handle) and fs is None:
fsspec = import_optional_dependency("fsspec")
if storage_options is None:
pa = import_optional_dependency("pyarrow")
pa_fs = import_optional_dependency("pyarrow.fs")

fs, path_or_handle = fsspec.core.url_to_fs(
path_or_handle, **(storage_options or {})
)
try:
fs, path_or_handle = pa_fs.FileSystem.from_uri(path)
except (TypeError, pa.ArrowInvalid):
pass
if fs is None:
fsspec = import_optional_dependency("fsspec")
fs, path_or_handle = fsspec.core.url_to_fs(
path_or_handle, **(storage_options or {})
)
elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
# can't write to a remote url
# without making use of fsspec at the moment
Expand Down Expand Up @@ -173,6 +200,7 @@ def write(
index: bool | None = None,
storage_options: StorageOptions = None,
partition_cols: list[str] | None = None,
filesystem=None,
**kwargs,
) -> None:
self.validate_dataframe(df)
Expand All @@ -183,9 +211,9 @@ def write(

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)

path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
path_or_handle, handles, filesystem = _get_path_or_handle(
path,
kwargs.pop("filesystem", None),
filesystem,
storage_options=storage_options,
mode="wb",
is_dir=partition_cols is not None,
Expand All @@ -207,12 +235,17 @@ def write(
path_or_handle,
compression=compression,
partition_cols=partition_cols,
filesystem=filesystem,
**kwargs,
)
else:
# write to single output file
self.api.parquet.write_table(
table, path_or_handle, compression=compression, **kwargs
table,
path_or_handle,
compression=compression,
filesystem=filesystem,
**kwargs,
)
finally:
if handles is not None:
Expand All @@ -225,6 +258,7 @@ def read(
use_nullable_dtypes: bool = False,
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
storage_options: StorageOptions = None,
filesystem=None,
**kwargs,
) -> DataFrame:
kwargs["use_pandas_metadata"] = True
Expand All @@ -242,15 +276,15 @@ def read(
if manager == "array":
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]

path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
path_or_handle, handles, filesystem = _get_path_or_handle(
path,
kwargs.pop("filesystem", None),
filesystem,
storage_options=storage_options,
mode="rb",
)
try:
pa_table = self.api.parquet.read_table(
path_or_handle, columns=columns, **kwargs
path_or_handle, columns=columns, filesystem=filesystem, **kwargs
)
result = pa_table.to_pandas(**to_pandas_kwargs)

Expand Down Expand Up @@ -279,6 +313,7 @@ def write(
index=None,
partition_cols=None,
storage_options: StorageOptions = None,
filesystem=None,
**kwargs,
) -> None:
self.validate_dataframe(df)
Expand All @@ -294,6 +329,11 @@ def write(
if partition_cols is not None:
kwargs["file_scheme"] = "hive"

if filesystem is not None:
raise NotImplementedError(
"filesystem is not implemented for the fastparquet engine."
)

# cannot use get_handle as write() does not accept file buffers
path = stringify_path(path)
if is_fsspec_url(path):
Expand All @@ -319,7 +359,12 @@ def write(
)

def read(
self, path, columns=None, storage_options: StorageOptions = None, **kwargs
self,
path,
columns=None,
storage_options: StorageOptions = None,
filesystem=None,
**kwargs,
) -> DataFrame:
parquet_kwargs: dict[str, Any] = {}
use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
Expand All @@ -337,6 +382,10 @@ def read(
"The 'dtype_backend' argument is not supported for the "
"fastparquet engine"
)
if filesystem is not None:
raise NotImplementedError(
"filesystem is not implemented for the fastparquet engine."
)
path = stringify_path(path)
handles = None
if is_fsspec_url(path):
Expand Down Expand Up @@ -376,6 +425,7 @@ def to_parquet(
index: bool | None = None,
storage_options: StorageOptions = None,
partition_cols: list[str] | None = None,
filesystem: Any = None,
**kwargs,
) -> bytes | None:
"""
Expand All @@ -398,6 +448,12 @@ def to_parquet(
``io.parquet.engine`` is used. The default ``io.parquet.engine``
behavior is to try 'pyarrow', falling back to 'fastparquet' if
'pyarrow' is unavailable.

When using the ``'pyarrow'`` engine and no storage options are provided
and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
(e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
Use the filesystem keyword with an instantiated fsspec filesystem
if you wish to use its implementation.
compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
default 'snappy'. Name of the compression to use. Use ``None``
for no compression. The supported compression methods actually
Expand All @@ -420,6 +476,12 @@ def to_parquet(

.. versionadded:: 1.2.0

filesystem : fsspec or pyarrow filesystem, default None
Filesystem object to use when reading the parquet file. Only implemented
for ``engine="pyarrow"``.

.. versionadded:: 2.1.0

kwargs
Additional keyword arguments passed to the engine

Expand All @@ -440,6 +502,7 @@ def to_parquet(
index=index,
partition_cols=partition_cols,
storage_options=storage_options,
filesystem=filesystem,
**kwargs,
)

Expand All @@ -458,6 +521,7 @@ def read_parquet(
storage_options: StorageOptions = None,
use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
filesystem: Any = None,
**kwargs,
) -> DataFrame:
"""
Expand All @@ -480,6 +544,12 @@ def read_parquet(
``io.parquet.engine`` is used. The default ``io.parquet.engine``
behavior is to try 'pyarrow', falling back to 'fastparquet' if
'pyarrow' is unavailable.

When using the ``'pyarrow'`` engine and no storage options are provided
and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
(e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
Use the filesystem keyword with an instantiated fsspec filesystem
if you wish to use its implementation.
columns : list, default=None
If not None, only these columns will be read from the file.

Expand Down Expand Up @@ -508,6 +578,12 @@ def read_parquet(

.. versionadded:: 2.0

filesystem : fsspec or pyarrow filesystem, default None
Filesystem object to use when reading the parquet file. Only implemented
for ``engine="pyarrow"``.

.. versionadded:: 2.1.0

**kwargs
Any additional kwargs are passed to the engine.

Expand Down Expand Up @@ -537,5 +613,6 @@ def read_parquet(
storage_options=storage_options,
use_nullable_dtypes=use_nullable_dtypes,
dtype_backend=dtype_backend,
filesystem=filesystem,
**kwargs,
)
22 changes: 18 additions & 4 deletions pandas/tests/io/test_gcs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from io import BytesIO
import os
import pathlib
import tarfile
import zipfile

Expand All @@ -20,7 +21,7 @@


@pytest.fixture
def gcs_buffer(monkeypatch):
def gcs_buffer():
"""Emulate GCS using a binary buffer."""
import fsspec

Expand All @@ -45,7 +46,7 @@ def ls(self, path, **kwargs):

@td.skip_if_no("gcsfs")
@pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"])
def test_to_read_gcs(gcs_buffer, format):
def test_to_read_gcs(gcs_buffer, format, monkeypatch, capsys):
"""
Test that many to/read functions support GCS.

Expand Down Expand Up @@ -75,8 +76,21 @@ def test_to_read_gcs(gcs_buffer, format):
df2 = read_json(path, convert_dates=["dt"])
elif format == "parquet":
pytest.importorskip("pyarrow")
df1.to_parquet(path)
df2 = read_parquet(path)
pa_fs = pytest.importorskip("pyarrow.fs")

class MockFileSystem(pa_fs.FileSystem):
@staticmethod
def from_uri(path):
print("Using pyarrow filesystem")
to_local = pathlib.Path(path.replace("gs://", "")).absolute().as_uri()
return pa_fs.LocalFileSystem(to_local)

with monkeypatch.context() as m:
m.setattr(pa_fs, "FileSystem", MockFileSystem)
df1.to_parquet(path)
df2 = read_parquet(path)
captured = capsys.readouterr()
assert captured.out == "Using pyarrow filesystem\nUsing pyarrow filesystem\n"
elif format == "markdown":
pytest.importorskip("tabulate")
df1.to_markdown(path)
Expand Down
60 changes: 60 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,66 @@ def test_bytes_file_name(self, engine):
result = read_parquet(path, engine=engine)
tm.assert_frame_equal(result, df)

def test_filesystem_notimplemented(self):
pytest.importorskip("fastparquet")
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean() as path:
with pytest.raises(
NotImplementedError, match="filesystem is not implemented"
):
df.to_parquet(path, engine="fastparquet", filesystem="foo")

with tm.ensure_clean() as path:
pathlib.Path(path).write_bytes(b"foo")
with pytest.raises(
NotImplementedError, match="filesystem is not implemented"
):
read_parquet(path, engine="fastparquet", filesystem="foo")

def test_invalid_filesystem(self):
pytest.importorskip("pyarrow")
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean() as path:
with pytest.raises(
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
):
df.to_parquet(path, engine="pyarrow", filesystem="foo")

with tm.ensure_clean() as path:
pathlib.Path(path).write_bytes(b"foo")
with pytest.raises(
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
):
read_parquet(path, engine="pyarrow", filesystem="foo")

def test_unsupported_pa_filesystem_storage_options(self):
pa_fs = pytest.importorskip("pyarrow.fs")
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean() as path:
with pytest.raises(
NotImplementedError,
match="storage_options not supported with a pyarrow FileSystem.",
):
df.to_parquet(
path,
engine="pyarrow",
filesystem=pa_fs.LocalFileSystem(),
storage_options={"foo": "bar"},
)

with tm.ensure_clean() as path:
pathlib.Path(path).write_bytes(b"foo")
with pytest.raises(
NotImplementedError,
match="storage_options not supported with a pyarrow FileSystem.",
):
read_parquet(
path,
engine="pyarrow",
filesystem=pa_fs.LocalFileSystem(),
storage_options={"foo": "bar"},
)

def test_invalid_dtype_backend(self, engine):
msg = (
"dtype_backend numpy is invalid, only 'numpy_nullable' and "
Expand Down