Skip to content

Feature request: DynamoDB Helper Utilities #2053

Open
@jplock

Description

@jplock

Use case

When working with DynamoDB, depending on the action being performed, there quality of life improvements that can be made to simplify the overall experience. For example:

  • When calling GetItem, automatically populate ExpressionAttributeNames from an list of attributes
  • When calling Query with Limit, automatically handle paginating with LastEvaluatedKey until the desired number of items are returned
    • This could also include methods for making defining KeyCondition easier and populating ExpressionAttributeNames and ExpressionAttributeValues.
  • When calling BatchGetItems, automatically retry UnprocessedKeys using exponential backoff with jitter.
  • Automatically serialized and deserialize DynamoDB objects into Python objects

Solution/User Experience

A helper class that implements some of these methods, such as:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import time
from typing import Dict, Any, List, TYPE_CHECKING, Generator, Tuple

from aws_lambda_powertools import Logger
import boto3
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.config import Config

if TYPE_CHECKING:
    from mypy_boto3_dynamodb import DynamoDBClient

logger = Logger(child=True)

__all__ = ["DynamoDBHelpers", "TransactionWriter"]


class TransactionWriter:
    """
    Automatically handle building a transaction to DynamoDB.
    """

    def __init__(self, table_name: str, client: "DynamoDBClient", flush_amount: int = 100) -> None:
        self._table_name = table_name
        self._client = client
        self._flush_amount = flush_amount
        self._items_buffer: List[Dict[str, Any]] = []

    def put_item(self, item: Dict[str, Any]) -> None:
        item["TableName"] = self._table_name
        self._add_request_and_process({"Put": item})

    def update_item(self, item: Dict[str, Any]) -> None:
        item["TableName"] = self._table_name
        self._add_request_and_process({"Update": item})

    def delete_item(self, item: Dict[str, Any]) -> None:
        item["TableName"] = self._table_name
        self._add_request_and_process({"Delete": item})

    def check_item(self, item: Dict[str, Any]) -> None:
        item["TableName"] = self._table_name
        self._add_request_and_process({"ConditionCheck": item})

    def _add_request_and_process(self, request: Dict[str, Any]) -> None:
        self._items_buffer.append(request)
        self._flush_if_needed()

    def _flush_if_needed(self) -> None:
        if len(self._items_buffer) >= self._flush_amount:
            self._flush()

    def _flush(self):
        items_to_send = self._items_buffer[: self._flush_amount]
        self._items_buffer = self._items_buffer[self._flush_amount :]

        self._client.transact_write_items(TransactItems=items_to_send)
        logger.debug(
            f"Transaction write sent {len(items_to_send)}, unprocessed: {len(self._items_buffer)}"
        )

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        # When we exit, we need to keep flushing whatever's left until there's nothing left in our items buffer.
        while self._items_buffer:
            self._flush()


class DynamoDBHelpers:
    MAX_BATCH_GET_SIZE = 100

    MAX_TRANSACTION_WRITE_SIZE = 100

    # Maximum number of retries
    MAX_RETRIES = 15

    # Minimum sleep time
    MIN_SLEEP_TIME = 1e-2

    _deserializer = TypeDeserializer()
    _serializer = TypeSerializer()
    _client = None

    def __init__(self, table_name: str, region: str, session: boto3.Session = None) -> None:
        if not session:
            session = boto3._get_default_session()

        config = Config(
            retries={
                "max_attempts": self.MAX_RETRIES,
                "mode": "standard",
            }
        )

        self._client: "DynamoDBClient" = session.client(
            "dynamodb", region_name=region, config=config
        )
        self._table_name = table_name

    @property
    def client(self):
        return self._client

    def transaction_writer(self) -> TransactionWriter:
        return TransactionWriter(self._table_name, self._client, self.MAX_TRANSACTION_WRITE_SIZE)

    def paginated_query(
        self, params: Dict[str, Any], page_size: int, acc: List[Dict[str, Any]] = None
    ) -> Tuple[List, bool]:
        """
        @see https://dev.to/andyrichardsonn/graphql-pagination-with-dynamodb-putting-it-together-20mn
        """
        if acc is None:
            acc = []

        logger.debug("paginated_query", params=params, page_size=page_size)

        remaining = page_size - len(acc)
        result = self._client.query(**params)
        new_items = result.get("Items", [])
        new_acc = [*acc, *(new_items[0:remaining])]

        cursor = result.get("LastEvaluatedKey")
        if not cursor:
            return new_acc, len(new_items) > remaining

        if len(new_acc) < page_size or len(new_items) <= remaining:
            return self.paginated_query(params | {"ExclusiveStartKey": cursor}, page_size, new_acc)

        return new_acc, True

    def get_items(
        self, keys: List[Dict[str, Any]], attributes: List[str] = None, in_transaction: bool = False
    ) -> Generator[Dict[str, Any], None, None]:
        projection_attrs = {}

        if attributes:
            names = {}
            placeholders = []
            for idx, attribute in enumerate(attributes):
                placeholder = f"#a{idx}"
                names[placeholder] = attribute
                placeholders.append(placeholder)

            projection_attrs["ExpressionAttributeNames"] = names
            projection_attrs["ProjectionExpression"] = ",".join(placeholders)

        for key_chunk in self._batch_keys(keys):
            if in_transaction:
                items = self._get_items_transaction(key_chunk, projection_attrs)
            else:
                items = self._get_items_batch(key_chunk, projection_attrs)

            for item in items:
                yield item

    def _get_items_batch(
        self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
    ) -> List[Dict[str, Any]]:
        """
        Gets a batch of items from Amazon DynamoDB in a batch from a single table.
        """
        if not keys:
            return []

        num_keys = len(keys)
        if num_keys > self.MAX_BATCH_GET_SIZE:
            raise Exception(
                f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
            )

        request_items = {
            self._table_name: {
                "Keys": self._serialize(keys),
            }
        }

        if attributes:
            request_items[self._table_name] |= attributes

        response = self._client.batch_get_item(RequestItems=request_items)
        items = response.get("Responses", {}).get(self._table_name, [])
        results: List[Dict[str, Any]] = [self._deserialize(item) for item in items]

        num_retries = 0

        while response.get("UnprocessedKeys", {}).get(self._table_name, None) is not None:
            num_keys = len(response.get("UnprocessedKeys", {}).get(self._table_name, []))

            num_retries += 1
            if num_retries > self.MAX_RETRIES:
                num_retries = random.randint(1, self.MAX_RETRIES)
            sleep_time = self.MIN_SLEEP_TIME * random.randint(1, 2**num_retries)

            logger.debug(
                f"Re-fetching {num_keys} keys, retry {num_retries} of {self.MAX_RETRIES}, sleeping for {sleep_time} seconds"
            )
            time.sleep(sleep_time)

            response = self._client.batch_get_item(RequestItems=response["UnprocessedKeys"])
            items = response.get("Responses", {}).get(self._table_name, [])
            results.extend([self._deserialize(item) for item in items])

        return results

    def _get_items_transaction(
        self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
    ) -> List[Dict[str, Any]]:
        """
        Gets a batch of items from Amazon DynamoDB in a transaction.
        """
        if not keys:
            return []

        num_keys = len(keys)
        if num_keys > self.MAX_BATCH_GET_SIZE:
            raise Exception(
                f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
            )

        items = []
        for key in keys:
            item = {
                "Get": {
                    "Key": self._serialize(key),
                    "TableName": self._table_name,
                }
            }
            if attributes:
                item["Get"] |= attributes
            items.append(item)

        response = self._client.transact_get_items(TransactItems=items)
        items = response.get("Responses", [])
        return [self._deserialize(item["Item"]) for item in items]

    @classmethod
    def _batch_keys(cls, keys: List[Dict[str, Any]]) -> Generator[List[Dict[str, Any]], None, None]:
        end = len(keys)
        for index in range(0, end, cls.MAX_BATCH_GET_SIZE):
            yield keys[index : min(index + cls.MAX_BATCH_GET_SIZE, end)]

    @classmethod
    def _deserialize(cls, item: Any) -> Any:
        if not item:
            return item

        if isinstance(item, dict) and "M" not in item:
            item = {"M": item}

        return cls._deserializer.deserialize(item)

    @classmethod
    def _serialize(cls, obj: Any) -> Dict[str, Any]:
        result = cls._serializer.serialize(obj)
        if "M" in result:
            result = result["M"]
        return result

Alternative solutions

PynamoDB is one alternative that has its own model structure. I'm implementing a single-table design and wanted more control over the items being inserted into the table.

What does a good UX look like?

Users shouldn't be concerned with how DynamoDB internally representing typing, so all outside->in and inside->out data conversations should be ran through the DynamoDB TypeSerializer and TypeDeserializer libraries. Fetching individual attributes can also be simplified to List[str] and then internally we build the ProjectionExpression and populate ExpressionAttributeNames.

Acknowledgment

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature-requestfeature requestneed-customer-feedbackRequires more customers feedback before making or revisiting a decision

    Type

    No type

    Projects

    Status

    Ideas

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions