diff --git a/doc/source/whatsnew/v2.1.0.rst b/doc/source/whatsnew/v2.1.0.rst index 4fa5d7c6f392f..6e38983b39409 100644 --- a/doc/source/whatsnew/v2.1.0.rst +++ b/doc/source/whatsnew/v2.1.0.rst @@ -112,7 +112,8 @@ Performance improvements - Performance improvement in :meth:`DataFrame.clip` and :meth:`Series.clip` (:issue:`51472`) - Performance improvement in :meth:`DataFrame.first_valid_index` and :meth:`DataFrame.last_valid_index` for extension array dtypes (:issue:`51549`) - Performance improvement in :meth:`DataFrame.where` when ``cond`` is backed by an extension dtype (:issue:`51574`) -- Performance improvement in :meth:`read_orc` when reading a remote URI file path. (:issue:`51609`) +- Performance improvement in :func:`read_orc` when reading a remote URI file path. (:issue:`51609`) +- Performance improvement in :func:`read_parquet` and :meth:`DataFrame.to_parquet` when reading a remote file with ``engine="pyarrow"`` (:issue:`51609`) - Performance improvement in :meth:`MultiIndex.sortlevel` when ``ascending`` is a list (:issue:`51612`) - Performance improvement in :meth:`~arrays.ArrowExtensionArray.isna` when array has zero nulls or is all nulls (:issue:`51630`) - Performance improvement when parsing strings to ``boolean[pyarrow]`` dtype (:issue:`51730`) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 0f8bf004b729a..a606cb9287d16 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -90,12 +90,39 @@ def _get_path_or_handle( ]: """File handling for PyArrow.""" path_or_handle = stringify_path(path) + if fs is not None: + pa_fs = import_optional_dependency("pyarrow.fs", errors="ignore") + fsspec = import_optional_dependency("fsspec", errors="ignore") + if pa_fs is None and fsspec is None: + raise ValueError( + f"filesystem must be a pyarrow or fsspec FileSystem, " + f"not a {type(fs).__name__}" + ) + elif (pa_fs is not None and not isinstance(fs, pa_fs.FileSystem)) and ( + fsspec is not None and not isinstance(fs, fsspec.spec.AbstractFileSystem) + ): + raise ValueError( + f"filesystem must be a pyarrow or fsspec FileSystem, " + f"not a {type(fs).__name__}" + ) + elif pa_fs is not None and isinstance(fs, pa_fs.FileSystem) and storage_options: + raise NotImplementedError( + "storage_options not supported with a pyarrow FileSystem." + ) if is_fsspec_url(path_or_handle) and fs is None: - fsspec = import_optional_dependency("fsspec") + if storage_options is None: + pa = import_optional_dependency("pyarrow") + pa_fs = import_optional_dependency("pyarrow.fs") - fs, path_or_handle = fsspec.core.url_to_fs( - path_or_handle, **(storage_options or {}) - ) + try: + fs, path_or_handle = pa_fs.FileSystem.from_uri(path) + except (TypeError, pa.ArrowInvalid): + pass + if fs is None: + fsspec = import_optional_dependency("fsspec") + fs, path_or_handle = fsspec.core.url_to_fs( + path_or_handle, **(storage_options or {}) + ) elif storage_options and (not is_url(path_or_handle) or mode != "rb"): # can't write to a remote url # without making use of fsspec at the moment @@ -173,6 +200,7 @@ def write( index: bool | None = None, storage_options: StorageOptions = None, partition_cols: list[str] | None = None, + filesystem=None, **kwargs, ) -> None: self.validate_dataframe(df) @@ -183,9 +211,9 @@ def write( table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( + path_or_handle, handles, filesystem = _get_path_or_handle( path, - kwargs.pop("filesystem", None), + filesystem, storage_options=storage_options, mode="wb", is_dir=partition_cols is not None, @@ -207,12 +235,17 @@ def write( path_or_handle, compression=compression, partition_cols=partition_cols, + filesystem=filesystem, **kwargs, ) else: # write to single output file self.api.parquet.write_table( - table, path_or_handle, compression=compression, **kwargs + table, + path_or_handle, + compression=compression, + filesystem=filesystem, + **kwargs, ) finally: if handles is not None: @@ -225,6 +258,7 @@ def read( use_nullable_dtypes: bool = False, dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, storage_options: StorageOptions = None, + filesystem=None, **kwargs, ) -> DataFrame: kwargs["use_pandas_metadata"] = True @@ -242,15 +276,15 @@ def read( if manager == "array": to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment] - path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( + path_or_handle, handles, filesystem = _get_path_or_handle( path, - kwargs.pop("filesystem", None), + filesystem, storage_options=storage_options, mode="rb", ) try: pa_table = self.api.parquet.read_table( - path_or_handle, columns=columns, **kwargs + path_or_handle, columns=columns, filesystem=filesystem, **kwargs ) result = pa_table.to_pandas(**to_pandas_kwargs) @@ -279,6 +313,7 @@ def write( index=None, partition_cols=None, storage_options: StorageOptions = None, + filesystem=None, **kwargs, ) -> None: self.validate_dataframe(df) @@ -294,6 +329,11 @@ def write( if partition_cols is not None: kwargs["file_scheme"] = "hive" + if filesystem is not None: + raise NotImplementedError( + "filesystem is not implemented for the fastparquet engine." + ) + # cannot use get_handle as write() does not accept file buffers path = stringify_path(path) if is_fsspec_url(path): @@ -319,7 +359,12 @@ def write( ) def read( - self, path, columns=None, storage_options: StorageOptions = None, **kwargs + self, + path, + columns=None, + storage_options: StorageOptions = None, + filesystem=None, + **kwargs, ) -> DataFrame: parquet_kwargs: dict[str, Any] = {} use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False) @@ -337,6 +382,10 @@ def read( "The 'dtype_backend' argument is not supported for the " "fastparquet engine" ) + if filesystem is not None: + raise NotImplementedError( + "filesystem is not implemented for the fastparquet engine." + ) path = stringify_path(path) handles = None if is_fsspec_url(path): @@ -376,6 +425,7 @@ def to_parquet( index: bool | None = None, storage_options: StorageOptions = None, partition_cols: list[str] | None = None, + filesystem: Any = None, **kwargs, ) -> bytes | None: """ @@ -398,6 +448,12 @@ def to_parquet( ``io.parquet.engine`` is used. The default ``io.parquet.engine`` behavior is to try 'pyarrow', falling back to 'fastparquet' if 'pyarrow' is unavailable. + + When using the ``'pyarrow'`` engine and no storage options are provided + and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec`` + (e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first. + Use the filesystem keyword with an instantiated fsspec filesystem + if you wish to use its implementation. compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}}, default 'snappy'. Name of the compression to use. Use ``None`` for no compression. The supported compression methods actually @@ -420,6 +476,12 @@ def to_parquet( .. versionadded:: 1.2.0 + filesystem : fsspec or pyarrow filesystem, default None + Filesystem object to use when reading the parquet file. Only implemented + for ``engine="pyarrow"``. + + .. versionadded:: 2.1.0 + kwargs Additional keyword arguments passed to the engine @@ -440,6 +502,7 @@ def to_parquet( index=index, partition_cols=partition_cols, storage_options=storage_options, + filesystem=filesystem, **kwargs, ) @@ -458,6 +521,7 @@ def read_parquet( storage_options: StorageOptions = None, use_nullable_dtypes: bool | lib.NoDefault = lib.no_default, dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, + filesystem: Any = None, **kwargs, ) -> DataFrame: """ @@ -480,6 +544,12 @@ def read_parquet( ``io.parquet.engine`` is used. The default ``io.parquet.engine`` behavior is to try 'pyarrow', falling back to 'fastparquet' if 'pyarrow' is unavailable. + + When using the ``'pyarrow'`` engine and no storage options are provided + and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec`` + (e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first. + Use the filesystem keyword with an instantiated fsspec filesystem + if you wish to use its implementation. columns : list, default=None If not None, only these columns will be read from the file. @@ -508,6 +578,12 @@ def read_parquet( .. versionadded:: 2.0 + filesystem : fsspec or pyarrow filesystem, default None + Filesystem object to use when reading the parquet file. Only implemented + for ``engine="pyarrow"``. + + .. versionadded:: 2.1.0 + **kwargs Any additional kwargs are passed to the engine. @@ -537,5 +613,6 @@ def read_parquet( storage_options=storage_options, use_nullable_dtypes=use_nullable_dtypes, dtype_backend=dtype_backend, + filesystem=filesystem, **kwargs, ) diff --git a/pandas/tests/io/test_gcs.py b/pandas/tests/io/test_gcs.py index a609c1b5fc03d..b65a19d766976 100644 --- a/pandas/tests/io/test_gcs.py +++ b/pandas/tests/io/test_gcs.py @@ -1,5 +1,6 @@ from io import BytesIO import os +import pathlib import tarfile import zipfile @@ -20,7 +21,7 @@ @pytest.fixture -def gcs_buffer(monkeypatch): +def gcs_buffer(): """Emulate GCS using a binary buffer.""" import fsspec @@ -45,7 +46,7 @@ def ls(self, path, **kwargs): @td.skip_if_no("gcsfs") @pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"]) -def test_to_read_gcs(gcs_buffer, format): +def test_to_read_gcs(gcs_buffer, format, monkeypatch, capsys): """ Test that many to/read functions support GCS. @@ -75,8 +76,21 @@ def test_to_read_gcs(gcs_buffer, format): df2 = read_json(path, convert_dates=["dt"]) elif format == "parquet": pytest.importorskip("pyarrow") - df1.to_parquet(path) - df2 = read_parquet(path) + pa_fs = pytest.importorskip("pyarrow.fs") + + class MockFileSystem(pa_fs.FileSystem): + @staticmethod + def from_uri(path): + print("Using pyarrow filesystem") + to_local = pathlib.Path(path.replace("gs://", "")).absolute().as_uri() + return pa_fs.LocalFileSystem(to_local) + + with monkeypatch.context() as m: + m.setattr(pa_fs, "FileSystem", MockFileSystem) + df1.to_parquet(path) + df2 = read_parquet(path) + captured = capsys.readouterr() + assert captured.out == "Using pyarrow filesystem\nUsing pyarrow filesystem\n" elif format == "markdown": pytest.importorskip("tabulate") df1.to_markdown(path) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 1548208c7eeaa..4ba3776bf6063 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1211,6 +1211,66 @@ def test_bytes_file_name(self, engine): result = read_parquet(path, engine=engine) tm.assert_frame_equal(result, df) + def test_filesystem_notimplemented(self): + pytest.importorskip("fastparquet") + df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]}) + with tm.ensure_clean() as path: + with pytest.raises( + NotImplementedError, match="filesystem is not implemented" + ): + df.to_parquet(path, engine="fastparquet", filesystem="foo") + + with tm.ensure_clean() as path: + pathlib.Path(path).write_bytes(b"foo") + with pytest.raises( + NotImplementedError, match="filesystem is not implemented" + ): + read_parquet(path, engine="fastparquet", filesystem="foo") + + def test_invalid_filesystem(self): + pytest.importorskip("pyarrow") + df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]}) + with tm.ensure_clean() as path: + with pytest.raises( + ValueError, match="filesystem must be a pyarrow or fsspec FileSystem" + ): + df.to_parquet(path, engine="pyarrow", filesystem="foo") + + with tm.ensure_clean() as path: + pathlib.Path(path).write_bytes(b"foo") + with pytest.raises( + ValueError, match="filesystem must be a pyarrow or fsspec FileSystem" + ): + read_parquet(path, engine="pyarrow", filesystem="foo") + + def test_unsupported_pa_filesystem_storage_options(self): + pa_fs = pytest.importorskip("pyarrow.fs") + df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]}) + with tm.ensure_clean() as path: + with pytest.raises( + NotImplementedError, + match="storage_options not supported with a pyarrow FileSystem.", + ): + df.to_parquet( + path, + engine="pyarrow", + filesystem=pa_fs.LocalFileSystem(), + storage_options={"foo": "bar"}, + ) + + with tm.ensure_clean() as path: + pathlib.Path(path).write_bytes(b"foo") + with pytest.raises( + NotImplementedError, + match="storage_options not supported with a pyarrow FileSystem.", + ): + read_parquet( + path, + engine="pyarrow", + filesystem=pa_fs.LocalFileSystem(), + storage_options={"foo": "bar"}, + ) + def test_invalid_dtype_backend(self, engine): msg = ( "dtype_backend numpy is invalid, only 'numpy_nullable' and "