diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index c76e18ae353a0..1f90da2f57579 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -2,7 +2,7 @@ import io import os -from typing import Any, AnyStr, Dict, List, Optional +from typing import Any, AnyStr, Dict, List, Optional, Tuple from warnings import catch_warnings from pandas._typing import FilePathOrBuffer, StorageOptions @@ -11,7 +11,7 @@ from pandas import DataFrame, get_option -from pandas.io.common import get_handle, is_fsspec_url, stringify_path +from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path def get_engine(engine: str) -> "BaseImpl": @@ -48,6 +48,40 @@ def get_engine(engine: str) -> "BaseImpl": raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") +def _get_path_or_handle( + path: FilePathOrBuffer, + fs: Any, + storage_options: StorageOptions = None, + mode: str = "rb", + is_dir: bool = False, +) -> Tuple[FilePathOrBuffer, Optional[IOHandles], Any]: + """File handling for PyArrow.""" + path_or_handle = stringify_path(path) + if is_fsspec_url(path_or_handle) and fs is None: + fsspec = import_optional_dependency("fsspec") + + fs, path_or_handle = fsspec.core.url_to_fs( + path_or_handle, **(storage_options or {}) + ) + elif storage_options: + raise ValueError("storage_options passed with buffer or non-fsspec filepath") + + handles = None + if ( + not fs + and not is_dir + and isinstance(path_or_handle, str) + and not os.path.isdir(path_or_handle) + ): + # use get_handle only when we are very certain that it is not a directory + # fsspec resources can also point to directories + # this branch is used for example when reading from non-fsspec URLs + handles = get_handle(path_or_handle, mode, is_text=False) + fs = None + path_or_handle = handles.handle + return path_or_handle, handles, fs + + class BaseImpl: @staticmethod def validate_dataframe(df: DataFrame): @@ -103,64 +137,50 @@ def write( table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - path = stringify_path(path) - # get_handle could be used here (for write_table, not for write_to_dataset) - # but it would complicate the code. - if is_fsspec_url(path) and "filesystem" not in kwargs: - # make fsspec instance, which pyarrow will use to open paths - fsspec = import_optional_dependency("fsspec") - - fs, path = fsspec.core.url_to_fs(path, **(storage_options or {})) - kwargs["filesystem"] = fs - - elif storage_options: - raise ValueError( - "storage_options passed with file object or non-fsspec file path" - ) - - if partition_cols is not None: - # writes to multiple files under the given path - self.api.parquet.write_to_dataset( - table, - path, - compression=compression, - partition_cols=partition_cols, - **kwargs, - ) - else: - # write to single output file - self.api.parquet.write_table(table, path, compression=compression, **kwargs) + path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( + path, + kwargs.pop("filesystem", None), + storage_options=storage_options, + mode="wb", + is_dir=partition_cols is not None, + ) + try: + if partition_cols is not None: + # writes to multiple files under the given path + self.api.parquet.write_to_dataset( + table, + path_or_handle, + compression=compression, + partition_cols=partition_cols, + **kwargs, + ) + else: + # write to single output file + self.api.parquet.write_table( + table, path_or_handle, compression=compression, **kwargs + ) + finally: + if handles is not None: + handles.close() def read( self, path, columns=None, storage_options: StorageOptions = None, **kwargs ): - path = stringify_path(path) - handles = None - fs = kwargs.pop("filesystem", None) - if is_fsspec_url(path) and fs is None: - fsspec = import_optional_dependency("fsspec") - - fs, path = fsspec.core.url_to_fs(path, **(storage_options or {})) - elif storage_options: - raise ValueError( - "storage_options passed with buffer or non-fsspec filepath" - ) - if not fs and isinstance(path, str) and not os.path.isdir(path): - # use get_handle only when we are very certain that it is not a directory - # fsspec resources can also point to directories - # this branch is used for example when reading from non-fsspec URLs - handles = get_handle(path, "rb", is_text=False) - path = handles.handle - kwargs["use_pandas_metadata"] = True - result = self.api.parquet.read_table( - path, columns=columns, filesystem=fs, **kwargs - ).to_pandas() - if handles is not None: - handles.close() - - return result + path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( + path, + kwargs.pop("filesystem", None), + storage_options=storage_options, + mode="rb", + ) + try: + return self.api.parquet.read_table( + path_or_handle, columns=columns, **kwargs + ).to_pandas() + finally: + if handles is not None: + handles.close() class FastParquetImpl(BaseImpl): diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 123e115cd2f2a..e5fffb0e3a3e8 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -691,6 +691,7 @@ def test_partition_cols_supported(self, pa, df_full): dataset = pq.ParquetDataset(path, validate_schema=False) assert len(dataset.partitions.partition_names) == 2 assert dataset.partitions.partition_names == set(partition_cols) + assert read_parquet(path).shape == df.shape def test_partition_cols_string(self, pa, df_full): # GH #27117 @@ -704,6 +705,7 @@ def test_partition_cols_string(self, pa, df_full): dataset = pq.ParquetDataset(path, validate_schema=False) assert len(dataset.partitions.partition_names) == 1 assert dataset.partitions.partition_names == set(partition_cols_list) + assert read_parquet(path).shape == df.shape @pytest.mark.parametrize("path_type", [str, pathlib.Path]) def test_partition_cols_pathlib(self, pa, df_compat, path_type): @@ -716,6 +718,7 @@ def test_partition_cols_pathlib(self, pa, df_compat, path_type): with tm.ensure_clean_dir() as path_str: path = path_type(path_str) df.to_parquet(path, partition_cols=partition_cols_list) + assert read_parquet(path).shape == df.shape def test_empty_dataframe(self, pa): # GH #27339