diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index 7dee94fc356..dddc36b426d 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -21,6 +21,23 @@ logger = logging.getLogger(__name__) +def _prepare_data(data: Any) -> Any: + """Prepare data for json serialization. + + We will convert Python dataclasses, pydantic models or event source data classes to a dict, + otherwise return data as-is. + """ + if hasattr(data, "__dataclass_fields__"): + import dataclasses + + return dataclasses.asdict(data) + + if callable(getattr(data, "dict", None)): + return data.dict() + + return getattr(data, "raw_event", data) + + class IdempotencyHandler: """ Base class to orchestrate calls to persistence layer. @@ -52,7 +69,7 @@ def __init__( Function keyword arguments """ self.function = function - self.data = function_payload + self.data = _prepare_data(function_payload) self.fn_args = function_args self.fn_kwargs = function_kwargs diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index 8f2b30d289a..b07662e6432 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -1,7 +1,6 @@ """ Persistence layers supporting idempotency """ - import datetime import hashlib import json @@ -226,7 +225,6 @@ def _generate_hash(self, data: Any) -> str: Hashed representation of the provided data """ - data = getattr(data, "raw_event", data) # could be a data class depending on decorator order hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode()) return hashed_data.hexdigest() diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 18a99b53999..1f4260a24b7 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -124,45 +124,50 @@ You can quickly start by initializing the `DynamoDBPersistenceLayer` class and u Similar to [idempotent decorator](#idempotent-decorator), you can use `idempotent_function` decorator for any synchronous Python function. -When using `idempotent_function`, you must tell us which keyword parameter in your function signature has the data we should use via **`data_keyword_argument`** - Such data must be JSON serializable. +When using `idempotent_function`, you must tell us which keyword parameter in your function signature has the data we should use via **`data_keyword_argument`**. + +!!! info "We support JSON serializable data, [Python Dataclasses](https://docs.python.org/3.7/library/dataclasses.html){target="_blank"}, [Parser/Pydantic Models](parser.md){target="_blank"}, and our [Event Source Data Classes](./data_classes.md){target="_blank"}." !!! warning "Make sure to call your decorated function using keyword arguments" -=== "app.py" +=== "batch_sample.py" This example also demonstrates how you can integrate with [Batch utility](batch.md), so you can process each record in an idempotent manner. - ```python hl_lines="4 13 18 25" - import uuid - - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - from aws_lambda_powertools.utilities.idempotency import idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig + ```python hl_lines="4-5 16 21 29" + from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, + batch_processor) + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.idempotency import ( + DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) + processor = BatchProcessor(event_type=EventType.SQS) dynamodb = DynamoDBPersistenceLayer(table_name="idem") config = IdempotencyConfig( - event_key_jmespath="messageId", # see "Choosing a payload subset for idempotency" section + event_key_jmespath="messageId", # see Choosing a payload subset section use_local_cache=True, ) - @idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) - def dummy(arg_one, arg_two, data: dict, **kwargs): - return {"data": data} - @idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb) - def record_handler(record): + def record_handler(record: SQSRecord): return {"message": record["body"]} - @sqs_batch_processor(record_handler=record_handler) + @idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) + def dummy(arg_one, arg_two, data: dict, **kwargs): + return {"data": data} + + + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): # `data` parameter must be called as a keyword argument to work dummy("hello", "universe", data="test") - return {"statusCode": 200} + return processor.response() ``` -=== "Example event" +=== "Batch event" ```json hl_lines="4" { @@ -193,6 +198,79 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo } ``` +=== "dataclass_sample.py" + + ```python hl_lines="3-4 23 32" + from dataclasses import dataclass + + from aws_lambda_powertools.utilities.idempotency import ( + DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) + + dynamodb = DynamoDBPersistenceLayer(table_name="idem") + config = IdempotencyConfig( + event_key_jmespath="order_id", # see Choosing a payload subset section + use_local_cache=True, + ) + + @dataclass + class OrderItem: + sku: str + description: str + + @dataclass + class Order: + item: OrderItem + order_id: int + + + @idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) + def process_order(order: Order): + return f"processed order {order.order_id}" + + + order_item = OrderItem(sku="fake", description="sample") + order = Order(item=order_item, order_id="fake-id") + + # `order` parameter must be called as a keyword argument to work + process_order(order=order) + ``` + +=== "parser_pydantic_sample.py" + + ```python hl_lines="1-2 22 31" + from aws_lambda_powertools.utilities.idempotency import ( + DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) + from aws_lambda_powertools.utilities.parser import BaseModel + + dynamodb = DynamoDBPersistenceLayer(table_name="idem") + config = IdempotencyConfig( + event_key_jmespath="order_id", # see Choosing a payload subset section + use_local_cache=True, + ) + + + class OrderItem(BaseModel): + sku: str + description: str + + + class Order(BaseModel): + item: OrderItem + order_id: int + + + @idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) + def process_order(order: Order): + return f"processed order {order.order_id}" + + + order_item = OrderItem(sku="fake", description="sample") + order = Order(item=order_item, order_id="fake-id") + + # `order` parameter must be called as a keyword argument to work + process_order(order=order) + ``` + ### Choosing a payload subset for idempotency !!! tip "Dealing with always changing payloads" @@ -209,7 +287,7 @@ Imagine the function executes successfully, but the client never receives the re !!! warning "Idempotency for JSON payloads" The payload extracted by the `event_key_jmespath` is treated as a string by default, so will be sensitive to differences in whitespace even when the JSON payload itself is identical. - To alter this behaviour, we can use the [JMESPath built-in function](jmespath_functions.md#powertools_json-function) `powertools_json()` to treat the payload as a JSON object rather than a string. + To alter this behaviour, we can use the [JMESPath built-in function](jmespath_functions.md#powertools_json-function) `powertools_json()` to treat the payload as a JSON object (dict) rather than a string. === "payment.py" diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py index 0f74d503b88..a6bcf072a82 100644 --- a/tests/functional/idempotency/conftest.py +++ b/tests/functional/idempotency/conftest.py @@ -1,5 +1,4 @@ import datetime -import hashlib import json from collections import namedtuple from decimal import Decimal @@ -11,20 +10,15 @@ from botocore.config import Config from jmespath import functions -from aws_lambda_powertools.shared.json_encoder import Encoder from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer from aws_lambda_powertools.utilities.idempotency.idempotency import IdempotencyConfig from aws_lambda_powertools.utilities.jmespath_utils import extract_data_from_envelope from aws_lambda_powertools.utilities.validation import envelopes -from tests.functional.utils import load_event +from tests.functional.utils import hash_idempotency_key, json_serialize, load_event TABLE_NAME = "TEST_TABLE" -def serialize(data): - return json.dumps(data, sort_keys=True, cls=Encoder) - - @pytest.fixture(scope="module") def config() -> Config: return Config(region_name="us-east-1") @@ -66,12 +60,12 @@ def lambda_response(): @pytest.fixture(scope="module") def serialized_lambda_response(lambda_response): - return serialize(lambda_response) + return json_serialize(lambda_response) @pytest.fixture(scope="module") def deserialized_lambda_response(lambda_response): - return json.loads(serialize(lambda_response)) + return json.loads(json_serialize(lambda_response)) @pytest.fixture @@ -150,7 +144,7 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali def hashed_idempotency_key(lambda_apigw_event, default_jmespath, lambda_context): compiled_jmespath = jmespath.compile(default_jmespath) data = compiled_jmespath.search(lambda_apigw_event) - return "test-func.lambda_handler#" + hashlib.md5(serialize(data).encode()).hexdigest() + return "test-func.lambda_handler#" + hash_idempotency_key(data) @pytest.fixture @@ -158,12 +152,12 @@ def hashed_idempotency_key_with_envelope(lambda_apigw_event): event = extract_data_from_envelope( data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={} ) - return "test-func.lambda_handler#" + hashlib.md5(serialize(event).encode()).hexdigest() + return "test-func.lambda_handler#" + hash_idempotency_key(event) @pytest.fixture def hashed_validation_key(lambda_apigw_event): - return hashlib.md5(serialize(lambda_apigw_event["requestContext"]).encode()).hexdigest() + return hash_idempotency_key(lambda_apigw_event["requestContext"]) @pytest.fixture diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index 51e142bfa55..0ed2cfcfb59 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -1,6 +1,4 @@ import copy -import hashlib -import json import sys from hashlib import md5 from unittest.mock import MagicMock @@ -8,9 +6,11 @@ import jmespath import pytest from botocore import stub +from pydantic import BaseModel from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig +from aws_lambda_powertools.utilities.idempotency.base import _prepare_data from aws_lambda_powertools.utilities.idempotency.exceptions import ( IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError, @@ -22,12 +22,18 @@ from aws_lambda_powertools.utilities.idempotency.idempotency import idempotent, idempotent_function from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer, DataRecord from aws_lambda_powertools.utilities.validation import envelopes, validator -from tests.functional.idempotency.conftest import serialize -from tests.functional.utils import load_event +from tests.functional.utils import hash_idempotency_key, json_serialize, load_event TABLE_NAME = "TEST_TABLE" +def get_dataclasses_lib(): + """Python 3.6 doesn't support dataclasses natively""" + import dataclasses + + return dataclasses + + # Using parametrize to run test twice, with two separate instances of persistence store. One instance with caching # enabled, and one without. @pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) @@ -744,7 +750,7 @@ def test_default_no_raise_on_missing_idempotency_key( hashed_key = persistence_store._get_hashed_idempotency_key({}) # THEN return the hash of None - expected_value = f"test-func.{function_name}#" + md5(serialize(None).encode()).hexdigest() + expected_value = f"test-func.{function_name}#" + md5(json_serialize(None).encode()).hexdigest() assert expected_value == hashed_key @@ -788,7 +794,7 @@ def test_jmespath_with_powertools_json( expected_value = [sub_attr_value, static_pk_value] api_gateway_proxy_event = { "requestContext": {"authorizer": {"claims": {"sub": sub_attr_value}}}, - "body": serialize({"id": static_pk_value}), + "body": json_serialize({"id": static_pk_value}), } # WHEN calling _get_hashed_idempotency_key @@ -872,9 +878,7 @@ def _delete_record(self, data_record: DataRecord) -> None: def test_idempotent_lambda_event_source(lambda_context): # Scenario to validate that we can use the event_source decorator before or after the idempotent decorator mock_event = load_event("apiGatewayProxyV2Event.json") - persistence_layer = MockPersistenceLayer( - "test-func.lambda_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + persistence_layer = MockPersistenceLayer("test-func.lambda_handler#" + hash_idempotency_key(mock_event)) expected_result = {"message": "Foo"} # GIVEN an event_source decorator @@ -894,9 +898,8 @@ def lambda_handler(event, _): def test_idempotent_function(): # Scenario to validate we can use idempotent_function with any function mock_event = {"data": "value"} - persistence_layer = MockPersistenceLayer( - "test-func.record_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record") @@ -913,9 +916,8 @@ def test_idempotent_function_arbitrary_args_kwargs(): # Scenario to validate we can use idempotent_function with a function # with an arbitrary number of args and kwargs mock_event = {"data": "value"} - persistence_layer = MockPersistenceLayer( - "test-func.record_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record") @@ -930,9 +932,8 @@ def record_handler(arg_one, arg_two, record, is_record): def test_idempotent_function_invalid_data_kwarg(): mock_event = {"data": "value"} - persistence_layer = MockPersistenceLayer( - "test-func.record_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} keyword_argument = "payload" @@ -949,9 +950,8 @@ def record_handler(record): def test_idempotent_function_arg_instead_of_kwarg(): mock_event = {"data": "value"} - persistence_layer = MockPersistenceLayer( - "test-func.record_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} keyword_argument = "record" @@ -969,18 +969,15 @@ def record_handler(record): def test_idempotent_function_and_lambda_handler(lambda_context): # Scenario to validate we can use both idempotent_function and idempotent decorators mock_event = {"data": "value"} - persistence_layer = MockPersistenceLayer( - "test-func.record_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record") def record_handler(record): return expected_result - persistence_layer = MockPersistenceLayer( - "test-func.lambda_handler#" + hashlib.md5(serialize(mock_event).encode()).hexdigest() - ) + persistence_layer = MockPersistenceLayer("test-func.lambda_handler#" + hash_idempotency_key(mock_event)) @idempotent(persistence_store=persistence_layer) def lambda_handler(event, _): @@ -1001,18 +998,16 @@ def test_idempotent_data_sorting(): # Scenario to validate same data in different order hashes to the same idempotency key data_one = {"data": "test message 1", "more_data": "more data 1"} data_two = {"more_data": "more data 1", "data": "test message 1"} - + idempotency_key = "test-func.dummy#" + hash_idempotency_key(data_one) # Assertion will happen in MockPersistenceLayer - persistence_layer = MockPersistenceLayer( - "test-func.dummy#" + hashlib.md5(json.dumps(data_one).encode()).hexdigest() - ) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) # GIVEN @idempotent_function(data_keyword_argument="payload", persistence_store=persistence_layer) def dummy(payload): return {"message": "hello"} - # WHEN + # WHEN/THEN assertion will happen at MockPersistenceLayer dummy(payload=data_two) @@ -1069,3 +1064,87 @@ def test_invalid_dynamodb_persistence_layer(): ) # and raise a ValueError assert str(ve.value) == "key_attr [id] and sort_key_attr [id] cannot be the same!" + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7 or higher for dataclasses") +def test_idempotent_function_dataclasses(): + # Scenario _prepare_data should convert a python dataclasses to a dict + dataclasses = get_dataclasses_lib() + + @dataclasses.dataclass + class Foo: + name: str + + expected_result = {"name": "Bar"} + data = Foo(name="Bar") + as_dict = _prepare_data(data) + assert as_dict == dataclasses.asdict(data) + assert as_dict == expected_result + + +def test_idempotent_function_pydantic(): + # Scenario _prepare_data should convert a pydantic to a dict + class Foo(BaseModel): + name: str + + expected_result = {"name": "Bar"} + data = Foo(name="Bar") + as_dict = _prepare_data(data) + assert as_dict == data.dict() + assert as_dict == expected_result + + +@pytest.mark.parametrize("data", [None, "foo", ["foo"], 1, True, {}]) +def test_idempotent_function_other(data): + # All other data types should be left as is + assert _prepare_data(data) == data + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7 or higher for dataclasses") +def test_idempotent_function_dataclass_with_jmespath(): + # GIVEN + dataclasses = get_dataclasses_lib() + config = IdempotencyConfig(event_key_jmespath="transaction_id", use_local_cache=True) + mock_event = {"customer_id": "fake", "transaction_id": "fake-id"} + idempotency_key = "test-func.collect_payment#" + hash_idempotency_key(mock_event["transaction_id"]) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) + + @dataclasses.dataclass + class Payment: + customer_id: str + transaction_id: str + + @idempotent_function(data_keyword_argument="payment", persistence_store=persistence_layer, config=config) + def collect_payment(payment: Payment): + return payment.transaction_id + + # WHEN + payment = Payment(**mock_event) + result = collect_payment(payment=payment) + + # THEN idempotency key assertion happens at MockPersistenceLayer + assert result == payment.transaction_id + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7 or higher for dataclasses") +def test_idempotent_function_pydantic_with_jmespath(): + # GIVEN + config = IdempotencyConfig(event_key_jmespath="transaction_id", use_local_cache=True) + mock_event = {"customer_id": "fake", "transaction_id": "fake-id"} + idempotency_key = "test-func.collect_payment#" + hash_idempotency_key(mock_event["transaction_id"]) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) + + class Payment(BaseModel): + customer_id: str + transaction_id: str + + @idempotent_function(data_keyword_argument="payment", persistence_store=persistence_layer, config=config) + def collect_payment(payment: Payment): + return payment.transaction_id + + # WHEN + payment = Payment(**mock_event) + result = collect_payment(payment=payment) + + # THEN idempotency key assertion happens at MockPersistenceLayer + assert result == payment.transaction_id diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 703f21744e2..5f1f21afc51 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -1,8 +1,11 @@ import base64 +import hashlib import json from pathlib import Path from typing import Any +from aws_lambda_powertools.shared.json_encoder import Encoder + def load_event(file_name: str) -> Any: path = Path(str(Path(__file__).parent.parent) + "/events/" + file_name) @@ -15,3 +18,12 @@ def str_to_b64(data: str) -> str: def b64_to_str(data: str) -> str: return base64.b64decode(data.encode()).decode("utf-8") + + +def json_serialize(data): + return json.dumps(data, sort_keys=True, cls=Encoder) + + +def hash_idempotency_key(data: Any): + """Serialize data to JSON, encode, and hash it for idempotency key""" + return hashlib.md5(json_serialize(data).encode()).hexdigest()