Skip to content

Commit 9a3842a

Browse files
authored
feat: Support different merge conditions in athena.to_iceberg function (#2861)
* added merge_condition to athena.to_iceberg * fix query string * fux ruff formatting
1 parent 7492962 commit 9a3842a

File tree

2 files changed

+70
-2
lines changed

2 files changed

+70
-2
lines changed

awswrangler/athena/_write_iceberg.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def _validate_args(
213213
mode: Literal["append", "overwrite", "overwrite_partitions"],
214214
partition_cols: list[str] | None,
215215
merge_cols: list[str] | None,
216+
merge_condition: Literal["update", "ignore"],
216217
) -> None:
217218
if df.empty is True:
218219
raise exceptions.EmptyDataFrame("DataFrame cannot be empty.")
@@ -232,6 +233,11 @@ def _validate_args(
232233
"When mode is 'overwrite_partitions' merge_cols must not be specified."
233234
)
234235

236+
if merge_cols and merge_condition not in ["update", "ignore"]:
237+
raise exceptions.InvalidArgumentValue(
238+
f"Invalid merge_condition: {merge_condition}. Valid values: ['update', 'ignore']"
239+
)
240+
235241

236242
@apply_configs
237243
@_utils.validate_distributed_kwargs(
@@ -246,6 +252,7 @@ def to_iceberg(
246252
table_location: str | None = None,
247253
partition_cols: list[str] | None = None,
248254
merge_cols: list[str] | None = None,
255+
merge_condition: Literal["update", "ignore"] = "update",
249256
keep_files: bool = True,
250257
data_source: str | None = None,
251258
s3_output: str | None = None,
@@ -292,6 +299,8 @@ def to_iceberg(
292299
List of column names that will be used for conditional inserts and updates.
293300
294301
https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
302+
merge_condition: str, optional
303+
The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore'].
295304
keep_files : bool
296305
Whether staging files produced by Athena are retained. 'True' by default.
297306
data_source : str, optional
@@ -376,6 +385,7 @@ def to_iceberg(
376385
mode=mode,
377386
partition_cols=partition_cols,
378387
merge_cols=merge_cols,
388+
merge_condition=merge_condition,
379389
)
380390

381391
glue_table_settings = cast(
@@ -497,12 +507,16 @@ def to_iceberg(
497507
# Insert or merge into Iceberg table
498508
sql_statement: str
499509
if merge_cols:
510+
if merge_condition == "update":
511+
match_condition = f"""WHEN MATCHED THEN
512+
UPDATE SET {', '.join([f'"{x}" = source."{x}"' for x in df.columns])}"""
513+
else:
514+
match_condition = ""
500515
sql_statement = f"""
501516
MERGE INTO "{database}"."{table}" target
502517
USING "{database}"."{temp_table}" source
503518
ON {' AND '.join([f'target."{x}" = source."{x}"' for x in merge_cols])}
504-
WHEN MATCHED THEN
505-
UPDATE SET {', '.join([f'"{x}" = source."{x}"' for x in df.columns])}
519+
{match_condition}
506520
WHEN NOT MATCHED THEN
507521
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
508522
VALUES ({', '.join([f'source."{x}"' for x in df.columns])})

tests/unit/test_athena_iceberg.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,60 @@ def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str,
650650
assert_pandas_equals(df_expected, df_out)
651651

652652

653+
def test_athena_to_iceberg_merge_into_ignore(path: str, path2: str, glue_database: str, glue_table: str) -> None:
654+
df = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["1984", "1996"], "gross": [35_000_000, 60_000_000]})
655+
df["title"] = df["title"].astype("string")
656+
df["year"] = df["year"].astype("string")
657+
df["gross"] = df["gross"].astype("Int64")
658+
659+
wr.athena.to_iceberg(
660+
df=df,
661+
database=glue_database,
662+
table=glue_table,
663+
table_location=path,
664+
temp_path=path2,
665+
keep_files=False,
666+
)
667+
668+
# Perform MERGE INTO
669+
df2 = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["2021", "1996"], "gross": [400_000_000, 60_000_001]})
670+
df2["title"] = df2["title"].astype("string")
671+
df2["year"] = df2["year"].astype("string")
672+
df2["gross"] = df2["gross"].astype("Int64")
673+
674+
wr.athena.to_iceberg(
675+
df=df2,
676+
database=glue_database,
677+
table=glue_table,
678+
table_location=path,
679+
temp_path=path2,
680+
keep_files=False,
681+
merge_cols=["title", "year"],
682+
merge_condition="ignore",
683+
)
684+
685+
# Expected output
686+
df_expected = pd.DataFrame(
687+
{
688+
"title": ["Dune", "Fargo", "Dune"],
689+
"year": ["1984", "1996", "2021"],
690+
"gross": [35_000_000, 60_000_000, 400_000_000],
691+
}
692+
)
693+
df_expected["title"] = df_expected["title"].astype("string")
694+
df_expected["year"] = df_expected["year"].astype("string")
695+
df_expected["gross"] = df_expected["gross"].astype("Int64")
696+
697+
df_out = wr.athena.read_sql_query(
698+
sql=f'SELECT * FROM "{glue_table}" ORDER BY year',
699+
database=glue_database,
700+
ctas_approach=False,
701+
unload_approach=False,
702+
)
703+
704+
assert_pandas_equals(df_expected, df_out)
705+
706+
653707
def test_athena_to_iceberg_cols_order(path: str, path2: str, glue_database: str, glue_table: str) -> None:
654708
kwargs = {
655709
"database": glue_database,

0 commit comments

Comments
 (0)