Skip to content

[Thread-safety] Concurrent instantiation of Boto3 clients #1718

Closed
@dispyfree

Description

@dispyfree

Expected Behaviour

The decorator idempotent_function should be thread-safe. In particular, consumers of the library should not have to worry about threading issues in the underlying library.

In particular, the code snippet provided below should run without errors.

Current Behaviour

AWS Powertools instantiates boto3 client objects concurrently. While boto3 client objects are thread-safe, their instantiation is not.
Hence, we see exceptions like the following:

Traceback (most recent call last):
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\base.py", line 106, in _process_idempotency
self.persistence_store.save_inprogress(
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\base.py", line 370, in save_inprogress
self._put_record(data_record=data_record)
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\dynamodb.py", line 218, in _put_record
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\dynamodb.py", line 108, in table
ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
File "ROOTidempotency_problem\venv\lib\site-packages\boto3\session.py", line 446, in resource
client = self.client(
File "ROOTidempotency_problem\venv\lib\site-packages\boto3\session.py", line 299, in client
return self._session.create_client(
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 953, in create_client
endpoint_resolver = self._get_internal_component('endpoint_resolver')
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 812, in _get_internal_component
return self._internal_components.get_component(name)
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 1112, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'

Code snippet

# Two threads, reliably reproducing the issue with 1.31.1 as well as the latest version (2.2.0): 

import functools
import inspect
import threading
import time
from dataclasses import dataclass
from enum import Enum
from botocore.config import Config

from threading import Thread
from typing import Callable

from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent_function
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer as OriginalDynamoDBPersistenceLayer

idempotentTableName: str = 'trading_idempotency'


class DynamoDBPersistenceLayer(OriginalDynamoDBPersistenceLayer):
	_instance = None

	# def __new__(cls, *_, **__):
	# 	if cls._instance is None:
	# 		cls._instance = super(DynamoDBPersistenceLayer, cls).__new__(cls)
	# 	return cls._instance


def mapArgsToKwargs(function: Callable, args: tuple, kwargs: dict) -> tuple[tuple, dict]:
	"""
	assumption: valid invocation signature
	"""
	argsList: list = list(args)
	kwargsNew: dict = kwargs.copy()

	signature: inspect.Signature = inspect.signature(function)
	defaultKwargs: dict = {k: v.default for k, v in signature.parameters.items() if v.default is not inspect.Parameter.empty}
	funcArgs: list = [arg for arg, _ in signature.parameters.items()]

	for funcArg in funcArgs:
		if argsList:
			kwargsNew[funcArg] = argsList.pop(0)

	for defaultKwargKey in defaultKwargs:
		if defaultKwargKey not in kwargsNew:
			kwargsNew[defaultKwargKey] = defaultKwargs[defaultKwargKey]
	return (), kwargsNew


def retryFunction(func: Callable, args: tuple = (), kwargs: dict = {}, maxRetries: int = 5, sleepFactor: float = 1) -> any:
	retry: int = 0
	while retry < maxRetries:
		time.sleep(retry * sleepFactor)
		try:
			result: any = func(*args, **kwargs)
			return result
		except Exception as e:
			exception = e
			retry += 1
			message: str = f'function "{func.__qualname__}" with args {args} and kwargs {kwargs} failed. retry count at {retry}: '
			print(message + str(e))
	time.sleep(0.1)
	raise exception


region_config = Config(
	region_name='us-east-1',
)
dynamo: DynamoDBPersistenceLayer = DynamoDBPersistenceLayer(table_name=idempotentTableName, boto_config = region_config)

def idempotentFunction(cacheKey: str, expectedDurationSeconds: int = 5, expiresAfterSeconds: int = 60 * 5):
	def decorator(function: Callable):
		@functools.wraps(function)
		def wrapper(*args, **kwargs):
			argsNew, kwargsNew = mapArgsToKwargs(function, args, kwargs)

			config: IdempotencyConfig = IdempotencyConfig(expires_after_seconds=expiresAfterSeconds)

			dynamo.configure(config=config, function_name=function.__qualname__)
			hashKey: str = dynamo._get_hashed_idempotency_key(kwargsNew[cacheKey].__dict__)
			functionToInvoke: Callable = idempotent_function(function, data_keyword_argument=cacheKey, persistence_store=dynamo, config=config)

			print(f'invoking idempotent function {function.__module__}.{function.__qualname__}, {hashKey} , with data {kwargsNew[cacheKey].__dict__}')
			# returnValue = helper.retryFunction(functionToInvoke, args=argsNew, kwargs=kwargsNew, sleepFactor=expectedDurationSeconds)
			returnValue = functionToInvoke(*argsNew, **kwargsNew)
			return returnValue

		return wrapper

	return decorator


class Side(str, Enum):
	BUY: str = 'buy'
	SELL: str = 'sell'


@dataclass
class TradeOrder:
	isin: str
	quantity: int
	side: Side
	limitPrice: float = None
	stopPrice: float = None

	def __str__(self) -> str:
		return f'isin: {self.isin}, quantity {self.quantity}, type {self.side.value}, optional limit/stop price {self.limitPrice}/{self.stopPrice}'


t1 = TradeOrder(isin='US7140461093', quantity=200, side=Side(Side.BUY))
t2 = TradeOrder(isin='US7140461091', quantity=201, side=Side(Side.BUY))


@idempotentFunction(cacheKey='tradeOrder')
def myFancyFunction(tradeOrder: TradeOrder):
	print(tradeOrder)
	time.sleep(5)
	return 'data'


def invoke(t: TradeOrder):
	myFancyFunction(t)


threads = [
	Thread(target=invoke, args=(t1,)),
	Thread(target=invoke, args=(t2,)),
]
for thread in threads:
	thread.start()

for thread in threads:
	thread.join()

Possible Solution

Simply instantiating the client as part of the persistance layer itself, instead of re-instantiating the client upon access of the property mitigates the problem to a large part. In particular, the chance of collisions is reduced greatly. In particular, if thread instantiation itself is sequentially done, then this already mitigates the problem entirely.
This is a patch for DynamoDBPersistanceLayer:

Index: venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
--- a/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py	
+++ b/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py	(date 1668465118921)
@@ -95,6 +95,7 @@
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr
+        self.ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
        super(DynamoDBPersistenceLayer, self).__init__()

    @property
@@ -105,8 +106,8 @@
        """
        if self._table:
            return self._table
-        ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
-        self._table = ddb_resource.Table(self.table_name)
+
+        self._table = self.ddb_resource.Table(self.table_name)
        return self._table

    @table.setter

Steps to Reproduce

(see code snippet above - "minimal breaking example" )

AWS Lambda Powertools for Python version

latest

AWS Lambda function runtime

3.9

Packaging format used

PyPi

Debugging logs

No response

Metadata

Metadata

Labels

bugSomething isn't workingidempotencyIdempotency utility

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions