Skip to content

Added ExpireSnapshots Feature #1880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0a94d96
Added initial units tests and Class for Removing a Snapshot
ForeverAngry Mar 29, 2025
5f0b62b
Added methods needed to expire snapshots by id, and optionally cleanu…
ForeverAngry Mar 31, 2025
f995daa
Update test_expire_snapshots.py
ForeverAngry Mar 31, 2025
65365e1
Added the builder method to __init__.py, updated the snapshot api wit…
ForeverAngry Apr 1, 2025
e28815f
Snapshots are not being transacted on, but need to re-assign refs
ForeverAngry Apr 1, 2025
4628ede
Fixed the test case.
ForeverAngry Apr 3, 2025
e80c41c
adding print statements to help with debugging
ForeverAngry Apr 3, 2025
cb9f0c9
Draft ready
ForeverAngry Apr 3, 2025
ebcff2b
Applied suggestions to Fix CICD
ForeverAngry Apr 3, 2025
97399bf
Merge branch 'main' into main
ForeverAngry Apr 3, 2025
95e5af2
Rebuild the poetry lock file.
ForeverAngry Apr 3, 2025
5ab5890
Merge branch 'main' into main
ForeverAngry Apr 4, 2025
5acd690
Refactor implementation of `ExpireSnapshots`
ForeverAngry Apr 13, 2025
d30a08c
Fixed format and linting issues
ForeverAngry Apr 13, 2025
e62ab58
Merge branch 'main' into main
ForeverAngry Apr 13, 2025
1af3258
Fixed format and linting issues
ForeverAngry Apr 13, 2025
352b48f
Merge branch 'main' of https://github.com/ForeverAngry/iceberg-python
ForeverAngry Apr 13, 2025
382e0ea
Merge branch 'main' into main
ForeverAngry Apr 18, 2025
549c183
rebased: from main
ForeverAngry Apr 19, 2025
386cb15
fixed: typo
ForeverAngry Apr 19, 2025
12729fa
removed errant files
ForeverAngry Apr 22, 2025
ce3515c
Added: public method signature to the init table file.
ForeverAngry Apr 22, 2025
28fce4b
Removed: `expire_snapshots_older_than` method, in favor of implementi…
ForeverAngry Apr 24, 2025
2c3153e
Update tests/table/test_expire_snapshots.py
ForeverAngry Apr 26, 2025
27c3ece
Removed: unrelated changes, Added: logic to expire snapshot method.
ForeverAngry Apr 26, 2025
05793c0
Merge branch 'main' into 1880-add-expire-snapshots
ForeverAngry Apr 26, 2025
8ec1889
Update test_partition_evolution.py
ForeverAngry Apr 26, 2025
b23ac6a
Update test_literals.py
ForeverAngry May 10, 2025
5c458f2
Update snapshot.py
ForeverAngry May 10, 2025
a08eb6b
Update snapshot.py
ForeverAngry May 11, 2025
689310d
Fixed: Linting
ForeverAngry May 11, 2025
9031f06
Update test_expire_snapshots.py
ForeverAngry May 17, 2025
3488314
Update test_expire_snapshots.py
ForeverAngry May 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import (
ManageSnapshots,
UpdateSnapshot,
_FastAppendFiles,
)
from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -1079,6 +1075,15 @@ def manage_snapshots(self) -> ManageSnapshots:
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def expire_snapshots(self) -> ExpireSnapshots:
"""
Shorthand to run expire snapshots by id or by a timestamp.

Use table.expire_snapshots().<operation>().commit() to run a specific operation.
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
"""
return ExpireSnapshots(transaction=Transaction(self, autocommit=True))

def update_statistics(self) -> UpdateStatistics:
"""
Shorthand to run statistics management operations like add statistics and remove statistics.
Expand Down
69 changes: 69 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from pyiceberg.partitioning import (
PartitionSpec,
)
from pyiceberg.table.refs import SnapshotRefType
from pyiceberg.table.snapshots import (
Operation,
Snapshot,
Expand All @@ -66,6 +67,7 @@
AddSnapshotUpdate,
AssertRefSnapshotId,
RemoveSnapshotRefUpdate,
RemoveSnapshotsUpdate,
SetSnapshotRefUpdate,
TableRequirement,
TableUpdate,
Expand Down Expand Up @@ -739,6 +741,7 @@ class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""

_snapshot_ids_to_expire: Set[int] = set()
_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

Expand Down Expand Up @@ -843,3 +846,69 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=branch_name)


class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
"""
Expire snapshots by ID.

Use table.expire_snapshots().<operation>().commit() to run a specific operation.
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
"""

_snapshot_ids_to_expire: Set[int] = set()
_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

def _commit(self) -> UpdatesAndRequirements:
"""
Commit the staged updates and requirements.

This will remove the snapshots with the given IDs.

Returns:
Tuple of updates and requirements to be committed,
as required by the calling parent apply functions.
"""
update = RemoveSnapshotsUpdate(snapshot_ids=self._snapshot_ids_to_expire)
self._updates += (update,)
return self._updates, self._requirements

def _get_protected_snapshot_ids(self) -> Set[int]:
"""
Get the IDs of protected snapshots.

These are the HEAD snapshots of all branches and all tagged snapshots. These ids are to be excluded from expiration.

Returns:
Set of protected snapshot IDs to exclude from expiration.
"""
protected_ids: Set[int] = set()

for ref in self._transaction.table_metadata.refs.values():
if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]:
protected_ids.add(ref.snapshot_id)

return protected_ids

def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots:
"""
Expire a snapshot by its ID.

This will mark the snapshot for expiration.

Args:
snapshot_id (int): The ID of the snapshot to expire.
Returns:
This for method chaining.
"""
if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None:
raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")

if snapshot_id in self._get_protected_snapshot_ids():
raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

self._snapshot_ids_to_expire.add(snapshot_id)

return self
119 changes: 119 additions & 0 deletions tests/table/test_expire_snapshots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import MagicMock
from uuid import uuid4

import pytest

from pyiceberg.table import CommitTableResponse, Table


def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None:
"""Test that a HEAD (branch) snapshot cannot be expired."""
HEAD_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004

# Mock the catalog's commit_table method
table_v2.catalog = MagicMock()
# Simulate refs protecting HEAD_SNAPSHOT as a branch
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=HEAD_SNAPSHOT, snapshot_ref_type="branch"),
"tag1": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="tag"),
}
}
)
# Assert fixture data
assert any(ref.snapshot_id == HEAD_SNAPSHOT for ref in table_v2.metadata.refs.values())

# Attempt to expire the HEAD snapshot and expect a ValueError
with pytest.raises(ValueError, match=f"Snapshot with ID {HEAD_SNAPSHOT} is protected and cannot be expired."):
table_v2.expire_snapshots().expire_snapshot_by_id(HEAD_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_not_called()


def test_cannot_expire_tagged_snapshot(table_v2: Table) -> None:
"""Test that a tagged snapshot cannot be expired."""
TAGGED_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004

table_v2.catalog = MagicMock()
# Simulate refs protecting TAGGED_SNAPSHOT as a tag
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"tag1": MagicMock(snapshot_id=TAGGED_SNAPSHOT, snapshot_ref_type="tag"),
"main": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="branch"),
}
}
)
assert any(ref.snapshot_id == TAGGED_SNAPSHOT for ref in table_v2.metadata.refs.values())

with pytest.raises(ValueError, match=f"Snapshot with ID {TAGGED_SNAPSHOT} is protected and cannot be expired."):
table_v2.expire_snapshots().expire_snapshot_by_id(TAGGED_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_not_called()


def test_expire_unprotected_snapshot(table_v2: Table) -> None:
"""Test that an unprotected snapshot can be expired."""
EXPIRE_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004

mock_response = CommitTableResponse(
metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT]}),
metadata_location="mock://metadata/location",
uuid=uuid4(),
)
table_v2.catalog = MagicMock()
table_v2.catalog.commit_table.return_value = mock_response

# Remove any refs that protect the snapshot to be expired
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="branch"),
"tag1": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="tag"),
}
}
)

# Assert fixture data
assert all(ref.snapshot_id != EXPIRE_SNAPSHOT for ref in table_v2.metadata.refs.values())

# Expire the snapshot
table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_called_once()
remaining_snapshots = table_v2.metadata.snapshots
assert EXPIRE_SNAPSHOT not in remaining_snapshots
assert len(table_v2.metadata.snapshots) == 1


def test_expire_nonexistent_snapshot_raises(table_v2: Table) -> None:
"""Test that trying to expire a non-existent snapshot raises an error."""
NONEXISTENT_SNAPSHOT = 9999999999999999999

table_v2.catalog = MagicMock()
table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}})

with pytest.raises(ValueError, match=f"Snapshot with ID {NONEXISTENT_SNAPSHOT} does not exist."):
table_v2.expire_snapshots().expire_snapshot_by_id(NONEXISTENT_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_not_called()