Skip to content

CLN: File handling for PyArrow parquet #37828

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 74 additions & 54 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io
import os
from typing import Any, AnyStr, Dict, List, Optional
from typing import Any, AnyStr, Dict, List, Optional, Tuple
from warnings import catch_warnings

from pandas._typing import FilePathOrBuffer, StorageOptions
Expand All @@ -11,7 +11,7 @@

from pandas import DataFrame, get_option

from pandas.io.common import get_handle, is_fsspec_url, stringify_path
from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -48,6 +48,40 @@ def get_engine(engine: str) -> "BaseImpl":
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")


def _get_path_or_handle(
path: FilePathOrBuffer,
fs: Any,
storage_options: StorageOptions = None,
mode: str = "rb",
is_dir: bool = False,
) -> Tuple[FilePathOrBuffer, Optional[IOHandles], Any]:
"""File handling for PyArrow."""
path_or_handle = stringify_path(path)
if is_fsspec_url(path_or_handle) and fs is None:
fsspec = import_optional_dependency("fsspec")

fs, path_or_handle = fsspec.core.url_to_fs(
path_or_handle, **(storage_options or {})
)
elif storage_options:
raise ValueError("storage_options passed with buffer or non-fsspec filepath")

handles = None
if (
not fs
and not is_dir
and isinstance(path_or_handle, str)
and not os.path.isdir(path_or_handle)
):
# use get_handle only when we are very certain that it is not a directory
# fsspec resources can also point to directories
# this branch is used for example when reading from non-fsspec URLs
handles = get_handle(path_or_handle, mode, is_text=False)
fs = None
path_or_handle = handles.handle
return path_or_handle, handles, fs


class BaseImpl:
@staticmethod
def validate_dataframe(df: DataFrame):
Expand Down Expand Up @@ -103,64 +137,50 @@ def write(

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)

path = stringify_path(path)
# get_handle could be used here (for write_table, not for write_to_dataset)
# but it would complicate the code.
if is_fsspec_url(path) and "filesystem" not in kwargs:
# make fsspec instance, which pyarrow will use to open paths
fsspec = import_optional_dependency("fsspec")

fs, path = fsspec.core.url_to_fs(path, **(storage_options or {}))
kwargs["filesystem"] = fs

elif storage_options:
raise ValueError(
"storage_options passed with file object or non-fsspec file path"
)

if partition_cols is not None:
# writes to multiple files under the given path
self.api.parquet.write_to_dataset(
table,
path,
compression=compression,
partition_cols=partition_cols,
**kwargs,
)
else:
# write to single output file
self.api.parquet.write_table(table, path, compression=compression, **kwargs)
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
path,
kwargs.pop("filesystem", None),
storage_options=storage_options,
mode="wb",
is_dir=partition_cols is not None,
)
try:
if partition_cols is not None:
# writes to multiple files under the given path
self.api.parquet.write_to_dataset(
table,
path_or_handle,
compression=compression,
partition_cols=partition_cols,
**kwargs,
)
else:
# write to single output file
self.api.parquet.write_table(
table, path_or_handle, compression=compression, **kwargs
)
finally:
if handles is not None:
handles.close()

def read(
self, path, columns=None, storage_options: StorageOptions = None, **kwargs
):
path = stringify_path(path)
handles = None
fs = kwargs.pop("filesystem", None)
if is_fsspec_url(path) and fs is None:
fsspec = import_optional_dependency("fsspec")

fs, path = fsspec.core.url_to_fs(path, **(storage_options or {}))
elif storage_options:
raise ValueError(
"storage_options passed with buffer or non-fsspec filepath"
)
if not fs and isinstance(path, str) and not os.path.isdir(path):
# use get_handle only when we are very certain that it is not a directory
# fsspec resources can also point to directories
# this branch is used for example when reading from non-fsspec URLs
handles = get_handle(path, "rb", is_text=False)
path = handles.handle

kwargs["use_pandas_metadata"] = True
result = self.api.parquet.read_table(
path, columns=columns, filesystem=fs, **kwargs
).to_pandas()

if handles is not None:
handles.close()

return result
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
path,
kwargs.pop("filesystem", None),
storage_options=storage_options,
mode="rb",
)
try:
return self.api.parquet.read_table(
path_or_handle, columns=columns, **kwargs
).to_pandas()
finally:
if handles is not None:
handles.close()


class FastParquetImpl(BaseImpl):
Expand Down
3 changes: 3 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ def test_partition_cols_supported(self, pa, df_full):
dataset = pq.ParquetDataset(path, validate_schema=False)
assert len(dataset.partitions.partition_names) == 2
assert dataset.partitions.partition_names == set(partition_cols)
assert read_parquet(path).shape == df.shape
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure that read_parquet is more tested when reading from a directory.

The read DataFrame has shuffled rows. Is that expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def test_partition_cols_string(self, pa, df_full):
# GH #27117
Expand All @@ -704,6 +705,7 @@ def test_partition_cols_string(self, pa, df_full):
dataset = pq.ParquetDataset(path, validate_schema=False)
assert len(dataset.partitions.partition_names) == 1
assert dataset.partitions.partition_names == set(partition_cols_list)
assert read_parquet(path).shape == df.shape

@pytest.mark.parametrize("path_type", [str, pathlib.Path])
def test_partition_cols_pathlib(self, pa, df_compat, path_type):
Expand All @@ -716,6 +718,7 @@ def test_partition_cols_pathlib(self, pa, df_compat, path_type):
with tm.ensure_clean_dir() as path_str:
path = path_type(path_str)
df.to_parquet(path, partition_cols=partition_cols_list)
assert read_parquet(path).shape == df.shape

def test_empty_dataframe(self, pa):
# GH #27339
Expand Down