Skip to content

UUIDType with BucketTransform incorrectly converts int to str in PartitionKey #2002

Open
@DinGo4DEV

Description

@DinGo4DEV

Apache Iceberg version

0.9.0 (latest release)

Please describe the bug 🐞

Description

When using UUIDType as a BucketTransform Partition, an error occurs during table operations such as upsert. The issue appears to be related to the partition key changing from int to str, which causes a type mismatch when the Avro encoder attempts to write an integer.

Steps to Reproduce

  1. Create a table with UUIDType column
  2. Configure the table to use BucketTransform on that column for partitioning
  3. Attempt to upsert data into the table

Current Behavior

The operation fails with a TypeError as the system attempts to perform integer operations on a string value.

Expected Behavior

The operation should properly handle UUIDType columns when used with BucketTransform partitioning. The uuid bucket partition value should be 1 instead of "1"

Error Stack Trace

Traceback (most recent call last):
    File "test_upsert.py", line 248, in <module>
        result = table.upsert(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\__init__.py", line 1216, in upsert
        tx.append(rows_to_insert)
    File ".venv\Lib\site-packages\pyiceberg\table\__init__.py", line 470, in append
        with self._append_snapshot_producer(snapshot_properties) as append_files:
    File ".venv\Lib\site-packages\pyiceberg\table\update\__init__.py", line 71, in __exit__
        self.commit()
    File ".venv\Lib\site-packages\pyiceberg\table\update\__init__.py", line 67, in commit
        self._transaction._apply(*self._commit())
                                                            ^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\update\snapshot.py", line 242, in _commit
        new_manifests = self._manifests()
                                        ^^^^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\update\snapshot.py", line 201, in _manifests
        return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^
    File "~\Python312\Lib\concurrent\futures\_base.py", line 456, in result
        return self.__get_result()
                     ^^^^^^^^^^^^^^^^^^^
    File "~\Python312\Lib\concurrent\futures\_base.py", line 401, in __get_result
        raise self._exception
    File "~\Python312\Lib\concurrent\futures\thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\update\snapshot.py", line 159, in _write_added_manifest        
        writer.add(
    File ".venv\Lib\site-packages\pyiceberg\manifest.py", line 847, in add
        self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
    File ".venv\Lib\site-packages\pyiceberg\manifest.py", line 840, in add_entry
        self._writer.write_block([self.prepare_entry(entry)])
    File ".venv\Lib\site-packages\pyiceberg\avro\file.py", line 281, in write_block       
        self.writer.write(block_content_encoder, obj)
        writer.write(encoder, val[pos] if pos is not None else None)
    File ".venv\Lib\site-packages\pyiceberg\avro\writer.py", line 176, in write
        writer.write(encoder, val[pos] if pos is not None else None)
        writer.write(encoder, val[pos] if pos is not None else None)
    File ".venv\Lib\site-packages\pyiceberg\avro\writer.py", line 176, in write
        writer.write(encoder, val[pos] if pos is not None else None)
    File ".venv\Lib\site-packages\pyiceberg\avro\writer.py", line 66, in write
        encoder.write_int(val)
    File ".venv\Lib\site-packages\pyiceberg\avro\encoder.py", line 45, in write_int
        datum = (integer << 1) ^ (integer >> 63)

Potential Fix

The issue appears to be in the type handling in partition_record_value function when initial PartitionKey with the PartitionFieldValue.

@dataclass(frozen=True)
class PartitionKey:
field_values: List[PartitionFieldValue]
partition_spec: PartitionSpec
schema: Schema
@cached_property
def partition(self) -> Record: # partition key transformed with iceberg internal representation as input
iceberg_typed_key_values = []
for raw_partition_field_value in self.field_values:
partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
if len(partition_fields) != 1:
raise ValueError(f"Cannot have redundant partitions: {partition_fields}")
partition_field = partition_fields[0]
iceberg_typed_key_values.append(
partition_record_value(
partition_field=partition_field,
value=raw_partition_field_value.value,
schema=self.schema,
)
)
return Record(*iceberg_typed_key_values)

Would add Union type for value to handle the transformed value.

@_to_partition_representation.register(UUIDType)
def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]:
return str(value) if value is not None else None

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions