Skip to content

Use Iceberg-Rust for parsing the ManifestList and Manifests #2004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
2,015 changes: 1,092 additions & 923 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
manifest_lists_to_delete = set()
manifests_to_delete: List[ManifestFile] = []
for snapshot in metadata.snapshots:
manifests_to_delete += snapshot.manifests(io)
manifests_to_delete += snapshot.manifests(table, io)
manifest_lists_to_delete.add(snapshot.manifest_list)

manifest_paths_to_delete = {manifest.manifest_path for manifest in manifests_to_delete}
Expand Down
101 changes: 83 additions & 18 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from enum import Enum
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
Expand Down Expand Up @@ -57,6 +58,9 @@
StructType,
)

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2
Expand Down Expand Up @@ -704,25 +708,85 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
Returns:
An Iterator of manifest entries.
"""
input_file = io.new_input(self.manifest_path)
with AvroFile[ManifestEntry](
input_file,
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestEntry, 2: DataFile},
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
) as reader:
return [
_inherit_from_manifest(entry, self)
for entry in reader
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
from pyiceberg_core import manifest

bs = io.new_input(self.manifest_path).open().read()
manifest = manifest.read_manifest_entries(bs)

# TODO: Don't convert the types
# but this is the easiest for now until we
# have the write part in there as well
def _convert_entry(entry: Any) -> ManifestEntry:
data_file = DataFile(
DataFileContent(entry.data_file.content),
entry.data_file.file_path,
# FileFormat(entry.data_file.file_format),
FileFormat.PARQUET,
entry.data_file.partition,
entry.data_file.record_count,
entry.data_file.file_size_in_bytes,
entry.data_file.column_sizes,
entry.data_file.value_counts,
entry.data_file.null_value_counts,
entry.data_file.nan_value_counts,
entry.data_file.lower_bounds,
entry.data_file.upper_bounds,
entry.data_file.key_metadata,
entry.data_file.split_offsets,
entry.data_file.equality_ids,
entry.data_file.sort_order_id,
)

return ManifestEntry(
ManifestEntryStatus(entry.status),
entry.snapshot_id,
entry.sequence_number,
entry.file_sequence_number,
data_file,
)

return [
_inherit_from_manifest(_convert_entry(entry), self)
for entry in manifest.entries()
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list, table: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str, table: "TableMetadata") -> Tuple[ManifestFile, ...]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass in the TableMetadata to have knowledge about the PartitionSpecs. I would prefer to not have to do this. There is a discussion on the Rust side here: apache/iceberg-rust#1328 (comment)

"""Read and cache manifests from the given manifest list, returning a tuple to prevent modification."""
file = io.new_input(manifest_list)
return tuple(read_manifest_list(file))
bs = io.new_input(manifest_list).open().read()
from pyiceberg_core import manifest

def partition_spec(spec_id: int) -> str:
spec = table.specs()[spec_id]
partition_type = spec.partition_type(table.schema())
struct = Schema(*partition_type.fields).as_struct()
payload = struct.model_dump_json()
return payload

cb = manifest.PartitionSpecProviderCallbackHolder(partition_spec)

return tuple(
ManifestFile(
manifest.manifest_path,
manifest.manifest_length,
manifest.partition_spec_id,
manifest.content,
manifest.sequence_number,
manifest.min_sequence_number,
manifest.added_snapshot_id,
manifest.added_files_count,
manifest.existing_files_count,
manifest.deleted_files_count,
manifest.added_rows_count,
manifest.existing_rows_count,
manifest.deleted_rows_count,
manifest.partitions,
manifest.key_metadata,
)
for manifest in manifest.read_manifest_list(bs, cb).entries()
)


def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
Expand Down Expand Up @@ -1093,6 +1157,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"format-version": "2",
"content": "data",
},
)
self._commit_snapshot_id = snapshot_id
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,7 @@ def plan_files(self) -> Iterable[FileScanTask]:

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
for manifest_file in snapshot.manifests(self.io, self.table_metadata)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def __str__(self) -> str:
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
return result_str

def manifests(self, io: FileIO) -> List[ManifestFile]:
def manifests(self, io: FileIO, table: TableMetadata) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
return list(_manifests(io, self.manifest_list))
return list(_manifests(io, self.manifest_list, table))


class MetadataLogEntry(IcebergBaseModel):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ psycopg2-binary = { version = ">=2.9.6", optional = true }
sqlalchemy = { version = "^2.0.18", optional = true }
getdaft = { version = ">=0.2.12", optional = true }
cachetools = "^5.5.0"
pyiceberg-core = { version = "^0.4.0", optional = true }
pyiceberg-core = { file = "/Users/fokko.driesprong/work/iceberg-rust/bindings/python/dist/pyiceberg_core-0.4.0-cp39-abi3-macosx_11_0_arm64.whl" }
polars = { version = "^1.21.0", optional = true }
thrift-sasl = { version = ">=0.4.3", optional = true }
kerberos = {version = "^1.3.1", optional = true}
Expand Down
29 changes: 27 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
)
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Accessor, Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1847,15 +1849,38 @@ def simple_map() -> MapType:


@pytest.fixture(scope="session")
def generated_manifest_entry_file(avro_schema_manifest_entry: Dict[str, Any]) -> Generator[str, None, None]:
def test_schema() -> Schema:
return Schema(NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False))


@pytest.fixture(scope="session")
def test_partition_spec() -> Schema:
return PartitionSpec(
PartitionField(1, 1000, IdentityTransform(), "VendorID"),
PartitionField(2, 1001, IdentityTransform(), "tpep_pickup_datetime"),
)


@pytest.fixture(scope="session")
def generated_manifest_entry_file(
avro_schema_manifest_entry: Dict[str, Any], test_schema: Schema, test_partition_spec: PartitionSpec
) -> Generator[str, None, None]:
from fastavro import parse_schema, writer

parsed_schema = parse_schema(avro_schema_manifest_entry)

with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/manifest.avro"
with open(tmp_avro_file, "wb") as out:
writer(out, parsed_schema, manifest_entry_records)
writer(
out,
parsed_schema,
manifest_entry_records,
metadata={
"schema": test_schema.model_dump_json(),
"partition-spec": test_partition_spec.fields,
},
)
yield tmp_avro_file


Expand Down
5 changes: 1 addition & 4 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def test_write_empty_manifest() -> None:

@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion, test_schema: Schema
) -> None:
io = load_file_io()
snapshot = Snapshot(
Expand All @@ -370,9 +370,6 @@ def test_write_manifest(
)
demo_manifest_file = snapshot.manifests(io)[0]
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
test_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False)
)
test_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"),
PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"),
Expand Down
Loading