Description
Use case
When working with DynamoDB, depending on the action being performed, there quality of life improvements that can be made to simplify the overall experience. For example:
- When calling
GetItem
, automatically populateExpressionAttributeNames
from an list of attributes - When calling
Query
withLimit
, automatically handle paginating withLastEvaluatedKey
until the desired number of items are returned- This could also include methods for making defining
KeyCondition
easier and populatingExpressionAttributeNames
andExpressionAttributeValues
.
- This could also include methods for making defining
- When calling
BatchGetItems
, automatically retryUnprocessedKeys
using exponential backoff with jitter. - Automatically serialized and deserialize DynamoDB objects into Python objects
Solution/User Experience
A helper class that implements some of these methods, such as:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import time
from typing import Dict, Any, List, TYPE_CHECKING, Generator, Tuple
from aws_lambda_powertools import Logger
import boto3
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.config import Config
if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient
logger = Logger(child=True)
__all__ = ["DynamoDBHelpers", "TransactionWriter"]
class TransactionWriter:
"""
Automatically handle building a transaction to DynamoDB.
"""
def __init__(self, table_name: str, client: "DynamoDBClient", flush_amount: int = 100) -> None:
self._table_name = table_name
self._client = client
self._flush_amount = flush_amount
self._items_buffer: List[Dict[str, Any]] = []
def put_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Put": item})
def update_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Update": item})
def delete_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Delete": item})
def check_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"ConditionCheck": item})
def _add_request_and_process(self, request: Dict[str, Any]) -> None:
self._items_buffer.append(request)
self._flush_if_needed()
def _flush_if_needed(self) -> None:
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[: self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount :]
self._client.transact_write_items(TransactItems=items_to_send)
logger.debug(
f"Transaction write sent {len(items_to_send)}, unprocessed: {len(self._items_buffer)}"
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
class DynamoDBHelpers:
MAX_BATCH_GET_SIZE = 100
MAX_TRANSACTION_WRITE_SIZE = 100
# Maximum number of retries
MAX_RETRIES = 15
# Minimum sleep time
MIN_SLEEP_TIME = 1e-2
_deserializer = TypeDeserializer()
_serializer = TypeSerializer()
_client = None
def __init__(self, table_name: str, region: str, session: boto3.Session = None) -> None:
if not session:
session = boto3._get_default_session()
config = Config(
retries={
"max_attempts": self.MAX_RETRIES,
"mode": "standard",
}
)
self._client: "DynamoDBClient" = session.client(
"dynamodb", region_name=region, config=config
)
self._table_name = table_name
@property
def client(self):
return self._client
def transaction_writer(self) -> TransactionWriter:
return TransactionWriter(self._table_name, self._client, self.MAX_TRANSACTION_WRITE_SIZE)
def paginated_query(
self, params: Dict[str, Any], page_size: int, acc: List[Dict[str, Any]] = None
) -> Tuple[List, bool]:
"""
@see https://dev.to/andyrichardsonn/graphql-pagination-with-dynamodb-putting-it-together-20mn
"""
if acc is None:
acc = []
logger.debug("paginated_query", params=params, page_size=page_size)
remaining = page_size - len(acc)
result = self._client.query(**params)
new_items = result.get("Items", [])
new_acc = [*acc, *(new_items[0:remaining])]
cursor = result.get("LastEvaluatedKey")
if not cursor:
return new_acc, len(new_items) > remaining
if len(new_acc) < page_size or len(new_items) <= remaining:
return self.paginated_query(params | {"ExclusiveStartKey": cursor}, page_size, new_acc)
return new_acc, True
def get_items(
self, keys: List[Dict[str, Any]], attributes: List[str] = None, in_transaction: bool = False
) -> Generator[Dict[str, Any], None, None]:
projection_attrs = {}
if attributes:
names = {}
placeholders = []
for idx, attribute in enumerate(attributes):
placeholder = f"#a{idx}"
names[placeholder] = attribute
placeholders.append(placeholder)
projection_attrs["ExpressionAttributeNames"] = names
projection_attrs["ProjectionExpression"] = ",".join(placeholders)
for key_chunk in self._batch_keys(keys):
if in_transaction:
items = self._get_items_transaction(key_chunk, projection_attrs)
else:
items = self._get_items_batch(key_chunk, projection_attrs)
for item in items:
yield item
def _get_items_batch(
self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""
Gets a batch of items from Amazon DynamoDB in a batch from a single table.
"""
if not keys:
return []
num_keys = len(keys)
if num_keys > self.MAX_BATCH_GET_SIZE:
raise Exception(
f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
)
request_items = {
self._table_name: {
"Keys": self._serialize(keys),
}
}
if attributes:
request_items[self._table_name] |= attributes
response = self._client.batch_get_item(RequestItems=request_items)
items = response.get("Responses", {}).get(self._table_name, [])
results: List[Dict[str, Any]] = [self._deserialize(item) for item in items]
num_retries = 0
while response.get("UnprocessedKeys", {}).get(self._table_name, None) is not None:
num_keys = len(response.get("UnprocessedKeys", {}).get(self._table_name, []))
num_retries += 1
if num_retries > self.MAX_RETRIES:
num_retries = random.randint(1, self.MAX_RETRIES)
sleep_time = self.MIN_SLEEP_TIME * random.randint(1, 2**num_retries)
logger.debug(
f"Re-fetching {num_keys} keys, retry {num_retries} of {self.MAX_RETRIES}, sleeping for {sleep_time} seconds"
)
time.sleep(sleep_time)
response = self._client.batch_get_item(RequestItems=response["UnprocessedKeys"])
items = response.get("Responses", {}).get(self._table_name, [])
results.extend([self._deserialize(item) for item in items])
return results
def _get_items_transaction(
self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""
Gets a batch of items from Amazon DynamoDB in a transaction.
"""
if not keys:
return []
num_keys = len(keys)
if num_keys > self.MAX_BATCH_GET_SIZE:
raise Exception(
f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
)
items = []
for key in keys:
item = {
"Get": {
"Key": self._serialize(key),
"TableName": self._table_name,
}
}
if attributes:
item["Get"] |= attributes
items.append(item)
response = self._client.transact_get_items(TransactItems=items)
items = response.get("Responses", [])
return [self._deserialize(item["Item"]) for item in items]
@classmethod
def _batch_keys(cls, keys: List[Dict[str, Any]]) -> Generator[List[Dict[str, Any]], None, None]:
end = len(keys)
for index in range(0, end, cls.MAX_BATCH_GET_SIZE):
yield keys[index : min(index + cls.MAX_BATCH_GET_SIZE, end)]
@classmethod
def _deserialize(cls, item: Any) -> Any:
if not item:
return item
if isinstance(item, dict) and "M" not in item:
item = {"M": item}
return cls._deserializer.deserialize(item)
@classmethod
def _serialize(cls, obj: Any) -> Dict[str, Any]:
result = cls._serializer.serialize(obj)
if "M" in result:
result = result["M"]
return result
Alternative solutions
PynamoDB is one alternative that has its own model structure. I'm implementing a single-table design and wanted more control over the items being inserted into the table.
What does a good UX look like?
Users shouldn't be concerned with how DynamoDB internally representing typing, so all outside->in and inside->out data conversations should be ran through the DynamoDB TypeSerializer
and TypeDeserializer
libraries. Fetching individual attributes can also be simplified to List[str]
and then internally we build the ProjectionExpression
and populate ExpressionAttributeNames
.
Acknowledgment
- This feature request meets Lambda Powertools Tenets
- Should this be considered in other Lambda Powertools languages? i.e. Java, TypeScript
Metadata
Metadata
Assignees
Type
Projects
Status