Skip to content

feat(idempotency): leverage new DynamoDB Failed conditional writes behavior with ReturnValuesOnConditionCheckFailure #3446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
92d21ec
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency…
Dec 4, 2023
7a8267f
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
Dec 4, 2023
bd92f33
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency…
Dec 4, 2023
0e01865
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 4, 2024
3a7e60a
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 10, 2024
5727c48
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 12, 2024
dd3707d
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency…
Jan 14, 2024
6a67bde
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 15, 2024
887e2e9
Improving code readability
leandrodamascena Jan 15, 2024
88ed83c
Reverting function
leandrodamascena Jan 15, 2024
d688322
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 15, 2024
7cb1b2c
Adding comments about some logic decisions
leandrodamascena Jan 15, 2024
efe99a2
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 15, 2024
d2798ad
Use DynamoDBPersistenceLayer passed in for test_idempotent_lambda_exp…
dastra Jan 16, 2024
a9a598c
Adding docs
leandrodamascena Jan 17, 2024
296e9ee
wording
Jan 17, 2024
cb94a14
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 17, 2024
7c66456
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 17, 2024
deca935
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 18, 2024
488fc01
Adressing Ruben's feedback
leandrodamascena Jan 18, 2024
1d6d944
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 18, 2024
7ec8343
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 19, 2024
85b829c
Adressing Ruben's feedback
leandrodamascena Jan 19, 2024
19e042e
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
STATUS_CONSTANTS,
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
Expand Down Expand Up @@ -118,11 +120,15 @@ def _process_idempotency(self):
data=self.data,
remaining_time_in_millis=self._get_remaining_time_in_millis(),
)
except IdempotencyKeyError:
except (IdempotencyKeyError, IdempotencyValidationError):
raise
except IdempotencyItemAlreadyExistsError:
# Now we know the item already exists, we can retrieve it
record = self._get_idempotency_record()
except IdempotencyItemAlreadyExistsError as exc:
# We now know the item exists
# Attempt to retrieve the record from the exception where ReturnValuesOnConditionCheckFailure is supported
record = exc.old_data_record
if record is None:
# Perform a GET on the record
record = self._get_idempotency_record()
if record is not None:
return self._handle_for_status(record)
except Exception as exc:
Expand Down
16 changes: 16 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from typing import Optional, Union

from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord


class BaseError(Exception):
"""
Expand All @@ -30,6 +32,20 @@ class IdempotencyItemAlreadyExistsError(BaseError):
Item attempting to be inserted into persistence store already exists and is not expired
"""

def __init__(self, *args: Optional[Union[str, Exception]], old_data_record: Optional[DataRecord] = None):
self.message = str(args[0]) if args else ""
self.details = "".join(str(arg) for arg in args[1:]) if args[1:] else None
self.old_data_record = old_data_record

def __str__(self):
"""
Return all arguments formatted or original message
"""
old_data_record = f" from [{(str(self.old_data_record))}]" if self.old_data_record else ""
details = f" - ({self.details})" if self.details else ""

return f"{self.message}{details}{old_data_record}"


class IdempotencyItemNotFoundError(BaseError):
"""
Expand Down
107 changes: 25 additions & 82 deletions aws_lambda_powertools/utilities/idempotency/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os
import warnings
from abc import ABC, abstractmethod
from types import MappingProxyType
from typing import Any, Dict, Optional

import jmespath
Expand All @@ -18,95 +17,18 @@
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyInvalidStatusError,
IdempotencyItemAlreadyExistsError,
IdempotencyKeyError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.jmespath_utils import PowertoolsFunctions

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
"""
Data Class for idempotency records.
"""

def __init__(
self,
idempotency_key: str,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: str = "",
payload_hash: str = "",
) -> None:
"""

Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@property
def is_expired(self) -> bool:
"""
Check if data record is expired

Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

@property
def status(self) -> str:
"""
Get status of data record

Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
elif self._status in STATUS_CONSTANTS.values():
return self._status
else:
raise IdempotencyInvalidStatusError(self._status)

def response_json_as_dict(self) -> Optional[dict]:
"""
Get response data deserialized to python dict

Returns
-------
Optional[dict]
previous response data deserialized
"""
return json.loads(self.response_data) if self.response_data else None


class BasePersistenceLayer(ABC):
"""
Expand Down Expand Up @@ -260,6 +182,27 @@ def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> No
if data_record.payload_hash != data_hash:
raise IdempotencyValidationError("Payload does not match stored record for this event key")

def _validate_hashed_payload(self, old_data_record: DataRecord, data_record: DataRecord) -> None:
"""
Validate that the hashed data provided matches the payload_hash stored data record

Parameters
----------
old_data_record: DataRecord
DataRecord instance fetched from Dynamo
data_record: DataRecord
DataRecord instance which failed insert into Dynamo

Raises
----------
IdempotencyValidationError
Payload doesn't match the stored record for the given idempotency key

"""
if self.payload_validation_enabled:
if old_data_record.payload_hash != data_record.payload_hash:
raise IdempotencyValidationError("Hashed payload does not match stored record for this event key")

def _get_expiry_timestamp(self) -> int:
"""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Data Class for idempotency records.
"""

import datetime
import json
import logging
from types import MappingProxyType
from typing import Optional

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
"""
Data Class for idempotency records.
"""

def __init__(
self,
idempotency_key: str,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: str = "",
payload_hash: str = "",
) -> None:
"""

Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@property
def is_expired(self) -> bool:
"""
Check if data record is expired

Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

@property
def status(self) -> str:
"""
Get status of data record

Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
if self._status in STATUS_CONSTANTS.values():
return self._status

from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyInvalidStatusError

raise IdempotencyInvalidStatusError(self._status)

def response_json_as_dict(self) -> Optional[dict]:
"""
Get response data deserialized to python dict

Returns
-------
Optional[dict]
previous response data deserialized
"""
return json.loads(self.response_data) if self.response_data else None
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
Expand Down Expand Up @@ -234,16 +235,51 @@ def _put_record(self, data_record: DataRecord) -> None:
":now_in_millis": {"N": str(int(now.timestamp() * 1000))},
":inprogress": {"S": STATUS_CONSTANTS["INPROGRESS"]},
},
**(
{"ReturnValuesOnConditionCheckFailure": "ALL_OLD"} # type: ignore
if self.boto3_supports_condition_check_failure(boto3.__version__)
else {}
),
)
except ClientError as exc:
error_code = exc.response.get("Error", {}).get("Code")
if error_code == "ConditionalCheckFailedException":
old_data_record = self._item_to_data_record(exc.response["Item"]) if "Item" in exc.response else None
if old_data_record is not None:
logger.debug(
f"Failed to put record for already existing idempotency key: "
f"{data_record.idempotency_key} with status: {old_data_record.status}, "
f"expiry_timestamp: {old_data_record.expiry_timestamp}, "
f"and in_progress_expiry_timestamp: {old_data_record.in_progress_expiry_timestamp}",
)
self._save_to_cache(data_record=old_data_record)

try:
self._validate_hashed_payload(old_data_record=old_data_record, data_record=data_record)
except IdempotencyValidationError as ive:
raise ive from exc

raise IdempotencyItemAlreadyExistsError(old_data_record=old_data_record) from exc

logger.debug(
f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}",
)
raise IdempotencyItemAlreadyExistsError from exc
else:
raise
raise IdempotencyItemAlreadyExistsError() from exc

raise

@staticmethod
def boto3_supports_condition_check_failure(boto3_version: str):
version = boto3_version.split(".")
# Only supported in boto3 1.26.164 and above
if len(version) >= 3 and int(version[0]) == 1 and int(version[1]) == 26 and int(version[2]) >= 164:
return True
if len(version) >= 2 and int(version[0]) == 1 and int(version[1]) > 26:
return True
if int(version[0]) > 1:
return True

return False

def _update_record(self, data_record: DataRecord):
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
Expand Down
4 changes: 2 additions & 2 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ If you're not [changing the default configuration for the DynamoDB persistence l

???+ info "Info: DynamoDB"
Each function invocation will generally make 2 requests to DynamoDB. If the
result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation. For retried invocations, you will
see 1WCU and 1RCU. Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to
result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation.
Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to
estimate the cost.

### Idempotent decorator
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ aws-encryption-sdk = { version = "^3.1.1", optional = true }
coverage = {extras = ["toml"], version = "^7.2"}
pytest = "^7.4.4"
black = "^23.3"
boto3 = "^1.18"
boto3 = "^1.26.164"
isort = "^5.11.5"
pytest-cov = "^4.1.0"
pytest-mock = "^3.11.1"
Expand Down
Loading