-
Notifications
You must be signed in to change notification settings - Fork 429
feat: Idempotency helper utility #245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 35 commits
d503fb0
e564c63
c4d19ba
45d384b
13e5b09
1efc27d
546e879
60fd336
ee46124
acab091
2a364fd
4f5d52b
a19d955
0ef52f9
d128b0a
4caa52c
d89fcee
ed9e0c2
7000927
c4856fd
aed4a7b
3b6c2e3
2047d34
8a054cb
523535f
834db1c
24f6187
41d559e
dca02ee
b4490b9
43b72e7
88e983e
d17275d
83d78ce
4bdfdf6
9de6e29
a4cc61a
978a6bb
1fd8b5a
022739e
8a2d4fe
e6f2d98
4aa8145
c54952c
7832e56
fed58fe
b079387
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from collections import OrderedDict | ||
|
||
|
||
class LRUDict(OrderedDict): | ||
def __init__(self, max_items=1024, *args, **kwds): | ||
to-mc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.max_items = max_items | ||
super().__init__(*args, **kwds) | ||
to-mc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __getitem__(self, key): | ||
value = super().__getitem__(key) | ||
self.move_to_end(key) | ||
return value | ||
|
||
def __setitem__(self, key, value): | ||
if key in self: | ||
self.move_to_end(key) | ||
super().__setitem__(key, value) | ||
if len(self) > self.max_items: | ||
oldest = next(iter(self)) | ||
del self[oldest] | ||
|
||
def get(self, key, *args, **kwargs): | ||
item = super(LRUDict, self).get(key, *args, **kwargs) | ||
if item: | ||
self.move_to_end(key=key) | ||
return item |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
""" | ||
Utility for adding idempotency to lambda functions | ||
""" | ||
|
||
from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer | ||
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer | ||
|
||
from .idempotency import idempotent | ||
|
||
__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
""" | ||
Idempotency errors | ||
""" | ||
|
||
|
||
class IdempotencyItemAlreadyExistsError(Exception): | ||
""" | ||
Item attempting to be inserted into persistence store already exists and is not expired | ||
""" | ||
|
||
|
||
class IdempotencyItemNotFoundError(Exception): | ||
""" | ||
Item does not exist in persistence store | ||
""" | ||
|
||
|
||
class IdempotencyAlreadyInProgressError(Exception): | ||
""" | ||
Execution with idempotency key is already in progress | ||
""" | ||
|
||
|
||
class IdempotencyInvalidStatusError(Exception): | ||
""" | ||
An invalid status was provided | ||
""" | ||
|
||
|
||
class IdempotencyValidationError(Exception): | ||
""" | ||
Payload does not match stored idempotency record | ||
""" | ||
|
||
|
||
class IdempotencyInconsistentStateError(Exception): | ||
""" | ||
State is inconsistent across multiple requests to persistence store | ||
""" | ||
|
||
|
||
class IdempotencyPersistenceLayerError(Exception): | ||
""" | ||
Unrecoverable error from the data store | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
""" | ||
Primary interface for idempotent Lambda functions utility | ||
""" | ||
import logging | ||
from typing import Any, Callable, Dict, Optional | ||
|
||
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator | ||
from aws_lambda_powertools.utilities.idempotency.exceptions import ( | ||
IdempotencyAlreadyInProgressError, | ||
IdempotencyInconsistentStateError, | ||
IdempotencyItemAlreadyExistsError, | ||
IdempotencyItemNotFoundError, | ||
IdempotencyPersistenceLayerError, | ||
IdempotencyValidationError, | ||
) | ||
from aws_lambda_powertools.utilities.idempotency.persistence.base import ( | ||
STATUS_CONSTANTS, | ||
BasePersistenceLayer, | ||
DataRecord, | ||
) | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@lambda_handler_decorator | ||
def idempotent( | ||
handler: Callable[[Any, LambdaContext], Any], | ||
event: Dict[str, Any], | ||
context: LambdaContext, | ||
persistence_store: BasePersistenceLayer, | ||
) -> Any: | ||
""" | ||
Middleware to handle idempotency | ||
|
||
Parameters | ||
---------- | ||
handler: Callable | ||
Lambda's handler | ||
event: Dict | ||
Lambda's Event | ||
context: Dict | ||
Lambda's Context | ||
persistence_store: BasePersistenceLayer | ||
Instance of BasePersistenceLayer to store data | ||
|
||
Examples | ||
-------- | ||
**Processes Lambda's event in an idempotent manner** | ||
>>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer | ||
>>> | ||
>>> persistence_store = DynamoDBPersistenceLayer(event_key="body", table_name="idempotency_store") | ||
to-mc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
>>> | ||
>>> @idempotent(persistence_store=persistence_store) | ||
>>> def handler(event, context): | ||
>>> return {"StatusCode": 200} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Should we explain how it'll work under the hood here? This could help clarify how often this will call DynamoDB, what parameters it'll look at to decide if it's an idempotent request or not, etc. Question: Should we mention how this stores the event into DynamoDB? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we address this it should be in the documentation. Happy to make changes to the docs based on feedback! |
||
""" | ||
|
||
idempotency_handler = IdempotencyHandler(handler, event, context, persistence_store) | ||
|
||
# IdempotencyInconsistentStateError can happen under rare but expected cases when persistent state changes in the | ||
# small time between put & get requests. In most cases we can retry successfully on this exception. | ||
max_handler_retries = 2 | ||
for i in range(max_handler_retries + 1): | ||
try: | ||
return idempotency_handler.handle() | ||
except IdempotencyInconsistentStateError: | ||
if i < max_handler_retries: | ||
continue | ||
else: | ||
# Allow the exception to bubble up after max retries exceeded | ||
raise | ||
|
||
|
||
class IdempotencyHandler: | ||
""" | ||
Class to orchestrate calls to persistence layer. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
lambda_handler: Callable[[Any, LambdaContext], Any], | ||
event: Dict[str, Any], | ||
context: LambdaContext, | ||
persistence_store: BasePersistenceLayer, | ||
): | ||
""" | ||
Initialize the IdempotencyHandler | ||
|
||
Parameters | ||
---------- | ||
lambda_handler : Callable[[Any, LambdaContext], Any] | ||
Lambda function handler | ||
event : Dict[str, Any] | ||
Event payload lambda handler will be called with | ||
context : LambdaContext | ||
Context object which will be passed to lambda handler | ||
persistence_store : BasePersistenceLayer | ||
Instance of persistence layer to store idempotency records | ||
""" | ||
self.persistence_store = persistence_store | ||
self.context = context | ||
self.event = event | ||
self.lambda_handler = lambda_handler | ||
self.max_handler_retries = 2 | ||
|
||
def handle(self) -> Any: | ||
""" | ||
Main entry point for handling idempotent execution of lambda handler. | ||
|
||
Returns | ||
------- | ||
Any | ||
lambda handler response | ||
|
||
""" | ||
try: | ||
# We call save_inprogress first as an optimization for the most common case where no idempotent record | ||
# already exists. If it succeeds, there's no need to call get_record. | ||
self.persistence_store.save_inprogress(event=self.event) | ||
except IdempotencyItemAlreadyExistsError: | ||
# Now we know the item already exists, we can retrieve it | ||
record = self._get_idempotency_record() | ||
return self._handle_for_status(record) | ||
|
||
return self._call_lambda() | ||
|
||
def _get_idempotency_record(self) -> DataRecord: | ||
""" | ||
Retrieve the idempotency record from the persistence layer. | ||
|
||
Raises | ||
---------- | ||
IdempotencyInconsistentStateError | ||
|
||
""" | ||
try: | ||
event_record = self.persistence_store.get_record(self.event) | ||
except IdempotencyItemNotFoundError: | ||
# This code path will only be triggered if the record is removed between save_inprogress and get_record. | ||
logger.debug( | ||
"An existing idempotency record was deleted before we could retrieve it. Proceeding with lambda " | ||
"handler" | ||
) | ||
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.") | ||
|
||
# Allow this exception to bubble up | ||
except IdempotencyValidationError: | ||
raise | ||
|
||
# Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for | ||
# clients | ||
except Exception as exc: | ||
raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc | ||
|
||
return event_record | ||
|
||
def _handle_for_status(self, event_record: DataRecord) -> Optional[Dict[Any, Any]]: | ||
""" | ||
Take appropriate action based on event_record's status | ||
|
||
Parameters | ||
---------- | ||
event_record: DataRecord | ||
|
||
Returns | ||
------- | ||
Optional[Dict[Any, Any] | ||
Lambda response previously used for this idempotency key, if it has successfully executed already. | ||
|
||
Raises | ||
------ | ||
AlreadyInProgressError | ||
A lambda execution is already in progress | ||
IdempotencyInconsistentStateError | ||
The persistence store reports inconsistent states across different requests. Retryable. | ||
""" | ||
# This code path will only be triggered if the record becomes expired between the save_inprogress call and here | ||
if event_record.status == STATUS_CONSTANTS["EXPIRED"]: | ||
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.") | ||
|
||
if event_record.status == STATUS_CONSTANTS["INPROGRESS"]: | ||
raise IdempotencyAlreadyInProgressError( | ||
to-mc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f"Execution already in progress with idempotency key: " | ||
f"{self.persistence_store.event_key_jmespath}={event_record.idempotency_key}" | ||
) | ||
|
||
return event_record.response_json_as_dict() | ||
|
||
def _call_lambda(self) -> Any: | ||
to-mc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Call the lambda handler function and update the persistence store appropriate depending on the output | ||
|
||
Returns | ||
------- | ||
Any | ||
lambda handler response | ||
|
||
""" | ||
try: | ||
handler_response = self.lambda_handler(self.event, self.context) | ||
except Exception as handler_exception: | ||
# We need these nested blocks to preserve lambda handler exception in case the persistence store operation | ||
# also raises an exception | ||
try: | ||
self.persistence_store.delete_record(event=self.event, exception=handler_exception) | ||
except Exception as delete_exception: | ||
raise IdempotencyPersistenceLayerError( | ||
"Failed to delete record from idempotency store" | ||
) from delete_exception | ||
raise | ||
|
||
else: | ||
try: | ||
self.persistence_store.save_success(event=self.event, result=handler_response) | ||
except Exception as save_exception: | ||
raise IdempotencyPersistenceLayerError( | ||
"Failed to update record state to success in idempotency store" | ||
) from save_exception | ||
|
||
return handler_response |
Uh oh!
There was an error while loading. Please reload this page.