Description
Expected Behaviour
The decorator idempotent_function
should be thread-safe. In particular, consumers of the library should not have to worry about threading issues in the underlying library.
In particular, the code snippet provided below should run without errors.
Current Behaviour
AWS Powertools instantiates boto3 client objects concurrently. While boto3 client objects are thread-safe, their instantiation is not.
Hence, we see exceptions like the following:
Traceback (most recent call last):
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\base.py", line 106, in _process_idempotency
self.persistence_store.save_inprogress(
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\base.py", line 370, in save_inprogress
self._put_record(data_record=data_record)
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\dynamodb.py", line 218, in _put_record
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\dynamodb.py", line 108, in table
ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
File "ROOTidempotency_problem\venv\lib\site-packages\boto3\session.py", line 446, in resource
client = self.client(
File "ROOTidempotency_problem\venv\lib\site-packages\boto3\session.py", line 299, in client
return self._session.create_client(
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 953, in create_client
endpoint_resolver = self._get_internal_component('endpoint_resolver')
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 812, in _get_internal_component
return self._internal_components.get_component(name)
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 1112, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'
Code snippet
# Two threads, reliably reproducing the issue with 1.31.1 as well as the latest version (2.2.0):
import functools
import inspect
import threading
import time
from dataclasses import dataclass
from enum import Enum
from botocore.config import Config
from threading import Thread
from typing import Callable
from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent_function
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer as OriginalDynamoDBPersistenceLayer
idempotentTableName: str = 'trading_idempotency'
class DynamoDBPersistenceLayer(OriginalDynamoDBPersistenceLayer):
_instance = None
# def __new__(cls, *_, **__):
# if cls._instance is None:
# cls._instance = super(DynamoDBPersistenceLayer, cls).__new__(cls)
# return cls._instance
def mapArgsToKwargs(function: Callable, args: tuple, kwargs: dict) -> tuple[tuple, dict]:
"""
assumption: valid invocation signature
"""
argsList: list = list(args)
kwargsNew: dict = kwargs.copy()
signature: inspect.Signature = inspect.signature(function)
defaultKwargs: dict = {k: v.default for k, v in signature.parameters.items() if v.default is not inspect.Parameter.empty}
funcArgs: list = [arg for arg, _ in signature.parameters.items()]
for funcArg in funcArgs:
if argsList:
kwargsNew[funcArg] = argsList.pop(0)
for defaultKwargKey in defaultKwargs:
if defaultKwargKey not in kwargsNew:
kwargsNew[defaultKwargKey] = defaultKwargs[defaultKwargKey]
return (), kwargsNew
def retryFunction(func: Callable, args: tuple = (), kwargs: dict = {}, maxRetries: int = 5, sleepFactor: float = 1) -> any:
retry: int = 0
while retry < maxRetries:
time.sleep(retry * sleepFactor)
try:
result: any = func(*args, **kwargs)
return result
except Exception as e:
exception = e
retry += 1
message: str = f'function "{func.__qualname__}" with args {args} and kwargs {kwargs} failed. retry count at {retry}: '
print(message + str(e))
time.sleep(0.1)
raise exception
region_config = Config(
region_name='us-east-1',
)
dynamo: DynamoDBPersistenceLayer = DynamoDBPersistenceLayer(table_name=idempotentTableName, boto_config = region_config)
def idempotentFunction(cacheKey: str, expectedDurationSeconds: int = 5, expiresAfterSeconds: int = 60 * 5):
def decorator(function: Callable):
@functools.wraps(function)
def wrapper(*args, **kwargs):
argsNew, kwargsNew = mapArgsToKwargs(function, args, kwargs)
config: IdempotencyConfig = IdempotencyConfig(expires_after_seconds=expiresAfterSeconds)
dynamo.configure(config=config, function_name=function.__qualname__)
hashKey: str = dynamo._get_hashed_idempotency_key(kwargsNew[cacheKey].__dict__)
functionToInvoke: Callable = idempotent_function(function, data_keyword_argument=cacheKey, persistence_store=dynamo, config=config)
print(f'invoking idempotent function {function.__module__}.{function.__qualname__}, {hashKey} , with data {kwargsNew[cacheKey].__dict__}')
# returnValue = helper.retryFunction(functionToInvoke, args=argsNew, kwargs=kwargsNew, sleepFactor=expectedDurationSeconds)
returnValue = functionToInvoke(*argsNew, **kwargsNew)
return returnValue
return wrapper
return decorator
class Side(str, Enum):
BUY: str = 'buy'
SELL: str = 'sell'
@dataclass
class TradeOrder:
isin: str
quantity: int
side: Side
limitPrice: float = None
stopPrice: float = None
def __str__(self) -> str:
return f'isin: {self.isin}, quantity {self.quantity}, type {self.side.value}, optional limit/stop price {self.limitPrice}/{self.stopPrice}'
t1 = TradeOrder(isin='US7140461093', quantity=200, side=Side(Side.BUY))
t2 = TradeOrder(isin='US7140461091', quantity=201, side=Side(Side.BUY))
@idempotentFunction(cacheKey='tradeOrder')
def myFancyFunction(tradeOrder: TradeOrder):
print(tradeOrder)
time.sleep(5)
return 'data'
def invoke(t: TradeOrder):
myFancyFunction(t)
threads = [
Thread(target=invoke, args=(t1,)),
Thread(target=invoke, args=(t2,)),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Possible Solution
Simply instantiating the client as part of the persistance layer itself, instead of re-instantiating the client upon access of the property mitigates the problem to a large part. In particular, the chance of collisions is reduced greatly. In particular, if thread instantiation itself is sequentially done, then this already mitigates the problem entirely.
This is a patch for DynamoDBPersistanceLayer:
Index: venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
--- a/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
+++ b/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py (date 1668465118921)
@@ -95,6 +95,7 @@
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
+ self.ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
super(DynamoDBPersistenceLayer, self).__init__()
@property
@@ -105,8 +106,8 @@
"""
if self._table:
return self._table
- ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
- self._table = ddb_resource.Table(self.table_name)
+
+ self._table = self.ddb_resource.Table(self.table_name)
return self._table
@table.setter
Steps to Reproduce
(see code snippet above - "minimal breaking example" )
AWS Lambda Powertools for Python version
latest
AWS Lambda function runtime
3.9
Packaging format used
PyPi
Debugging logs
No response