diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 0e2637cc358..e135655ef61 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -12,8 +12,12 @@ EventType, FailureResponse, SuccessResponse, +) +from aws_lambda_powertools.utilities.batch.decorators import ( async_batch_processor, + async_process_partial_response, batch_processor, + process_partial_response, ) from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import ( @@ -22,6 +26,10 @@ from aws_lambda_powertools.utilities.batch.types import BatchTypeModels __all__ = ( + "async_batch_processor", + "async_process_partial_response", + "batch_processor", + "process_partial_response", "BatchProcessor", "AsyncBatchProcessor", "BasePartialProcessor", @@ -32,6 +40,4 @@ "FailureResponse", "SuccessResponse", "SqsFifoPartialProcessor", - "batch_processor", - "async_batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 4cdea6d28f5..210caf2bb14 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -11,19 +11,8 @@ import sys from abc import ABC, abstractmethod from enum import Enum -from typing import ( - Any, - Awaitable, - Callable, - Dict, - List, - Optional, - Tuple, - Union, - overload, -) +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload -from aws_lambda_powertools.middleware_factory import lambda_handler_decorator from aws_lambda_powertools.shared import constants from aws_lambda_powertools.utilities.batch.exceptions import ( BatchProcessingError, @@ -513,51 +502,6 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons return self.failure_handler(record=data, exception=sys.exc_info()) -@lambda_handler_decorator -def batch_processor( - handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor -): - """ - Middleware to handle batch event processing - - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: LambdaContext - Lambda's Context - record_handler: Callable - Callable or corutine to process each record from the batch - processor: BatchProcessor - Batch Processor to handle partial failure cases - - Examples - -------- - **Processes Lambda's event with a BasePartialProcessor** - - >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor - >>> - >>> def record_handler(record): - >>> return record["body"] - >>> - >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor()) - >>> def handler(event, context): - >>> return {"StatusCode": 200} - - Limitations - ----------- - * Async batch processors. Use `async_batch_processor` instead. - """ - records = event["Records"] - - with processor(records, record_handler, lambda_context=context): - processor.process() - - return handler(event, context) - - class AsyncBatchProcessor(BasePartialBatchProcessor): """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously. @@ -694,52 +638,3 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa return self._register_model_validation_error_record(record) except Exception: return self.failure_handler(record=data, exception=sys.exc_info()) - - -@lambda_handler_decorator -def async_batch_processor( - handler: Callable, - event: Dict, - context: LambdaContext, - record_handler: Callable[..., Awaitable[Any]], - processor: AsyncBatchProcessor, -): - """ - Middleware to handle batch event processing - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: LambdaContext - Lambda's Context - record_handler: Callable[..., Awaitable[Any]] - Callable to process each record from the batch - processor: AsyncBatchProcessor - Batch Processor to handle partial failure cases - Examples - -------- - **Processes Lambda's event with a BasePartialProcessor** - >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor - >>> - >>> async def async_record_handler(record): - >>> payload: str = record.body - >>> return payload - >>> - >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) - >>> - >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) - >>> async def lambda_handler(event, context: LambdaContext): - >>> return processor.response() - - Limitations - ----------- - * Sync batch processors. Use `batch_processor` instead. - """ - records = event["Records"] - - with processor(records, record_handler, lambda_context=context): - processor.async_process() - - return handler(event, context) diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py new file mode 100644 index 00000000000..8f594c1479e --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/decorators.py @@ -0,0 +1,249 @@ +from __future__ import annotations + +from typing import Any, Awaitable, Callable, Dict, List + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator +from aws_lambda_powertools.utilities.batch import ( + AsyncBatchProcessor, + BasePartialBatchProcessor, + BatchProcessor, + EventType, +) +from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@lambda_handler_decorator +def async_batch_processor( + handler: Callable, + event: Dict, + context: LambdaContext, + record_handler: Callable[..., Awaitable[Any]], + processor: AsyncBatchProcessor, +): + """ + Middleware to handle batch event processing + + Notes + ----- + Consider using async_process_partial_response function for an easier experience. + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: LambdaContext + Lambda's Context + record_handler: Callable[..., Awaitable[Any]] + Callable to process each record from the batch + processor: AsyncBatchProcessor + Batch Processor to handle partial failure cases + + Examples + -------- + **Processes Lambda's event with a BasePartialProcessor** + >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor + >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + >>> + >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) + >>> + >>> async def async_record_handler(record: SQSRecord): + >>> payload: str = record.body + >>> return payload + >>> + >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) + >>> def lambda_handler(event, context): + >>> return processor.response() + + Limitations + ----------- + * Sync batch processors. Use `batch_processor` instead. + """ + records = event["Records"] + + with processor(records, record_handler, lambda_context=context): + processor.async_process() + + return handler(event, context) + + +@lambda_handler_decorator +def batch_processor( + handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor +): + """ + Middleware to handle batch event processing + + Notes + ----- + Consider using process_partial_response function for an easier experience. + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: LambdaContext + Lambda's Context + record_handler: Callable + Callable or corutine to process each record from the batch + processor: BatchProcessor + Batch Processor to handle partial failure cases + + Examples + -------- + **Processes Lambda's event with a BatchProcessor** + + >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType + >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + >>> + >>> processor = BatchProcessor(EventType.SQS) + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor()) + >>> def handler(event, context): + >>> return processor.response() + + Limitations + ----------- + * Async batch processors. Use `async_batch_processor` instead. + """ + records = event["Records"] + + with processor(records, record_handler, lambda_context=context): + processor.process() + + return handler(event, context) + + +def process_partial_response( + event: Dict, + record_handler: Callable, + processor: BasePartialBatchProcessor, + context: LambdaContext | None = None, +) -> PartialItemFailureResponse: + """ + Higher level function to handle batch event processing. + + Parameters + ---------- + event: Dict + Lambda's original event + record_handler: Callable + Callable to process each record from the batch + processor: BasePartialBatchProcessor + Batch Processor to handle partial failure cases + context: LambdaContext + Lambda's context, used to optionally inject in record handler + + Returns + ------- + result: PartialItemFailureResponse + Lambda Partial Batch Response + + Examples + -------- + **Processes Lambda's SQS event** + + ```python + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + + processor = BatchProcessor(EventType.SQS) + + def record_handler(record: SQSRecord): + return record.body + + def handler(event, context): + return process_partial_response( + event=event, record_handler=record_handler, processor=processor, context=context + ) + ``` + + Limitations + ----------- + * Async batch processors. Use `async_process_partial_response` instead. + """ + try: + records: List[Dict] = event.get("Records", []) + except AttributeError: + event_types = ", ".join(list(EventType.__members__)) + docs = "https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line + raise ValueError( + f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n" + f"See sample events in our documentation for either {event_types}: \n {docs}" + ) + + with processor(records, record_handler, context): + processor.process() + + return processor.response() + + +def async_process_partial_response( + event: Dict, + record_handler: Callable, + processor: AsyncBatchProcessor, + context: LambdaContext | None = None, +) -> PartialItemFailureResponse: + """ + Higher level function to handle batch event processing asynchronously. + + Parameters + ---------- + event: Dict + Lambda's original event + record_handler: Callable + Callable to process each record from the batch + processor: AsyncBatchProcessor + Batch Processor to handle partial failure cases + context: LambdaContext + Lambda's context, used to optionally inject in record handler + + Returns + ------- + result: PartialItemFailureResponse + Lambda Partial Batch Response + + Examples + -------- + **Processes Lambda's SQS event** + + ```python + from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + + processor = BatchProcessor(EventType.SQS) + + async def record_handler(record: SQSRecord): + return record.body + + def handler(event, context): + return async_process_partial_response( + event=event, record_handler=record_handler, processor=processor, context=context + ) + ``` + + Limitations + ----------- + * Sync batch processors. Use `process_partial_response` instead. + """ + try: + records: List[Dict] = event.get("Records", []) + except AttributeError: + event_types = ", ".join(list(EventType.__members__)) + docs = "https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line + raise ValueError( + f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n" + f"See sample events in our documentation for either {event_types}: \n {docs}" + ) + + with processor(records, record_handler, context): + processor.async_process() + + return processor.response() diff --git a/aws_lambda_powertools/utilities/batch/types.py b/aws_lambda_powertools/utilities/batch/types.py index 1fc5aba4fc4..89a5e4e3486 100644 --- a/aws_lambda_powertools/utilities/batch/types.py +++ b/aws_lambda_powertools/utilities/batch/types.py @@ -2,7 +2,9 @@ # type specifics # import sys -from typing import Optional, Type, Union +from typing import List, Optional, Type, Union + +from typing_extensions import TypedDict has_pydantic = "pydantic" in sys.modules @@ -22,3 +24,11 @@ else: BatchTypeModels = "BatchTypeModels" # type: ignore BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore + + +class PartialItemFailures(TypedDict): + itemIdentifier: str + + +class PartialItemFailureResponse(TypedDict): + batchItemFailures: List[PartialItemFailures] diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 0f899673c2e..47cc2147978 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -219,23 +219,28 @@ The remaining sections of the documentation will rely on these samples. For comp ### Processing messages from SQS -Processing batches from SQS works in four stages: +Processing batches from SQS works in three stages: 1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type 2. Define your function to handle each batch record, and use [`SQSRecord`](data_classes.md#sqs){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +3. Use **`process_partial_response`** to kick off processing ???+ info This code example optionally uses Tracer and Logger for completion. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="4-5 9 15 23 25" + ```python hl_lines="4 9 12 18 29" + --8<-- "examples/batch_processing/src/getting_started_sqs.py" + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 24-26 28" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -254,14 +259,17 @@ Processing batches from SQS works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + return processor.response() ``` -=== "As a context manager" +=== "As a decorator (legacy)" - ```python hl_lines="4-5 9 15 24-26 28" + ```python hl_lines="4-5 9 15 23 25" import json from aws_lambda_powertools import Logger, Tracer @@ -284,11 +292,8 @@ Processing batches from SQS works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - return processor.response() ``` @@ -352,37 +357,48 @@ Processing batches from SQS works in four stages: When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. This helps preserve the ordering of messages in your queue. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="5 11" - --8<-- "examples/batch_processing/src/sqs_fifo_batch_processor.py" + ```python hl_lines="3 9" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py" ``` === "As a context manager" - ```python hl_lines="4 8" - --8<-- "examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py" + ```python hl_lines="2 6" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py" + ``` + +=== "As a decorator (legacy)" + + ```python hl_lines="3 9" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py" ``` ### Processing messages from Kinesis -Processing batches from Kinesis works in four stages: +Processing batches from Kinesis works in three stages: 1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type 2. Define your function to handle each batch record, and use [`KinesisStreamRecord`](data_classes.md#kinesis-streams){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +3. Use **`process_partial_response`** to kick off processing ???+ info This code example optionally uses Tracer and Logger for completion. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="4-5 9 15 22 24" + ```python hl_lines="2 7 12 18 28" + --8<-- "examples/batch_processing/src/getting_started_kinesis.py" + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 23-25 27" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -400,16 +416,17 @@ Processing batches from Kinesis works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + return processor.response() ``` -=== "As a context manager" - - ```python hl_lines="4-5 9 15 23-25 27" - import json +=== "As a decorator (legacy)" + ```python hl_lines="2-3 7 20 22" from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord @@ -429,11 +446,8 @@ Processing batches from Kinesis works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - return processor.response() ``` @@ -494,23 +508,28 @@ Processing batches from Kinesis works in four stages: ### Processing messages from DynamoDB -Processing batches from Kinesis works in four stages: +Processing batches from Kinesis works in three stages: 1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type 2. Define your function to handle each batch record, and use [`DynamoDBRecord`](data_classes.md#dynamodb-streams){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +3. Use **`process_partial_response`** to kick off processing ???+ info This code example optionally uses Tracer and Logger for completion. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="4-5 9 15 25 27" + ```python hl_lines="4 9 14 20 30" + --8<-- "examples/batch_processing/src/getting_started_dynamodb.py" + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 23-27" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -524,21 +543,21 @@ Processing batches from Kinesis works in four stages: def record_handler(record: DynamoDBRecord): logger.info(record.dynamodb.new_image) payload: dict = json.loads(record.dynamodb.new_image.get("Message")) - # alternatively: - # changes: Dict[str, Any] = record.dynamodb.new_image - # payload = change.get("Message").raw_event -> {"S": ""} ... @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + return processor.response() ``` -=== "As a context manager" +=== "As a decorator (legacy)" - ```python hl_lines="4-5 9 15 26-28 30" + ```python hl_lines="4-5 9 15 22 24" import json from aws_lambda_powertools import Logger, Tracer @@ -555,19 +574,13 @@ Processing batches from Kinesis works in four stages: @tracer.capture_method def record_handler(record: DynamoDBRecord): logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("item")) - # alternatively: - # changes: Dict[str, Any] = record.dynamodb.new_image - # payload = change.get("Message") -> "" + payload: dict = json.loads(record.dynamodb.new_image.get("Message")) ... @logger.inject_lambda_context @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - return processor.response() ``` @@ -648,16 +661,11 @@ All records in the batch will be passed to this handler for processing, even if * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing * **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing -???+ warning - You will not have access to the **processed messages** within the Lambda Handler; use context manager for that. - - All processing logic will and should be performed by the `record_handler` function. - ### Processing messages asynchronously !!! tip "New to AsyncIO? Read this [comprehensive guide first](https://realpython.com/async-io-python/){target="_blank"}." -You can use `AsyncBatchProcessor` class and `async_batch_processor` decorator to process messages concurrently. +You can use `AsyncBatchProcessor` class and `async_process_partial_response` function to process messages concurrently. ???+ question "When is this useful?" Your use case might be able to process multiple records at the same time without conflicting with one another. @@ -666,8 +674,8 @@ You can use `AsyncBatchProcessor` class and `async_batch_processor` decorator to The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). -```python hl_lines="4 6 11 14 23" title="High-concurrency with AsyncBatchProcessor" ---8<-- "examples/batch_processing/src/getting_started_async_batch_processor.py" +```python hl_lines="3 11 14 24" title="High-concurrency with AsyncBatchProcessor" +--8<-- "examples/batch_processing/src/getting_started_async.py" ``` ???+ warning "Using tracer?" @@ -685,13 +693,14 @@ Inheritance is importance because we need to access message IDs and sequence num === "SQS" - ```python hl_lines="5 9-10 12-19 21 27" + ```python hl_lines="5 13 22 28" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response from aws_lambda_powertools.utilities.parser.models import SqsRecordModel from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.parser import validator, BaseModel class Order(BaseModel): @@ -717,25 +726,26 @@ Inheritance is importance because we need to access message IDs and sequence num @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` === "Kinesis Data Streams" - ```python hl_lines="5 9-10 12-20 22-23 26 32" + ```python hl_lines="5 14 25 29 35" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecordPayload, KinesisDataStreamRecord + from aws_lambda_powertools.utilities.parser import BaseModel, validator from aws_lambda_powertools.utilities.typing import LambdaContext class Order(BaseModel): item: dict + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): data: Order @@ -743,10 +753,11 @@ Inheritance is importance because we need to access message IDs and sequence num # so Pydantic can auto-initialize nested Order model @validator("data", pre=True) def transform_message_to_dict(cls, value: str): - # Powertools KinesisDataStreamRecordModel already decodes b64 to str here + # Powertools KinesisDataStreamRecord already decodes b64 to str here return json.loads(value) - class OrderKinesisRecord(KinesisDataStreamRecordModel): + + class OrderKinesisRecord(KinesisDataStreamRecord): kinesis: OrderKinesisPayloadRecord @@ -762,27 +773,28 @@ Inheritance is importance because we need to access message IDs and sequence num @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` === "DynamoDB Streams" - ```python hl_lines="7 11-12 14-21 23-25 27-28 31 37" + ```python hl_lines="7 16 26 31 35 41" import json - from typing import Dict, Literal + from typing import Dict, Literal, Optional from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.parser import BaseModel, validator class Order(BaseModel): item: dict + class OrderDynamoDB(BaseModel): Message: Order @@ -792,15 +804,17 @@ Inheritance is importance because we need to access message IDs and sequence num def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): return json.loads(value["S"]) + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): NewImage: Optional[OrderDynamoDB] OldImage: Optional[OrderDynamoDB] + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): dynamodb: OrderDynamoDBChangeRecord - processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderKinesisRecord) + processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) tracer = Tracer() logger = Logger() @@ -812,9 +826,8 @@ Inheritance is importance because we need to access message IDs and sequence num @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` ### Accessing processed messages @@ -824,7 +837,7 @@ Use the context manager to access a list of all returned values from your `recor * **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record -```python hl_lines="31-38" title="Accessing processed messages via context manager" +```python hl_lines="30-36" title="Accessing processed messages via context manager" import json from typing import Any, List, Literal, Union @@ -833,8 +846,7 @@ from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, FailureResponse, - SuccessResponse, - batch_processor) + SuccessResponse) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -873,7 +885,13 @@ Within your `record_handler` function, you might need access to the Lambda conte We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/python-context.html){target="_blank"} into your `record_handler` if your function signature has a parameter named `lambda_context`. When using a context manager, you also need to pass the Lambda context object like in the example below. -=== "As a decorator" +=== "Recommended" + + ```python hl_lines="19" + --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context.py" + ``` + +=== "As a decorator (legacy)" ```python hl_lines="15" from typing import Optional @@ -952,7 +970,7 @@ from typing import Tuple from aws_lambda_powertools import Metrics from aws_lambda_powertools.metrics import MetricUnit -from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, ExceptionInfo, EventType, FailureResponse +from aws_lambda_powertools.utilities.batch import BatchProcessor, ExceptionInfo, EventType, FailureResponse, process_partial_response from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord @@ -973,9 +991,8 @@ def record_handler(record: SQSRecord): ... @metrics.log_metrics(capture_cold_start_metric=True) -@batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` ### Create your own partial processor @@ -1157,7 +1174,7 @@ Given a SQS batch where the first batch record succeeds and the second fails pro import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -1176,9 +1193,8 @@ Given a SQS batch where the first batch record succeeds and the second fails pro @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` === "Sample SQS event" @@ -1228,6 +1244,12 @@ Given a SQS batch where the first batch record succeeds and the second fails pro Use context manager when you want access to the processed messages or handle `BatchProcessingError` exception when all records within the batch fail to be processed. +### What's the difference between the decorator and process_partial_response functions? + +`batch_processor` and `async_batch_processor` decorators are now considered legacy. Historically, they were kept due to backwards compatibility and to minimize code changes between V1 and V2. + +As 2.12.0, `process_partial_response` and `async_process_partial_response` are the recommended instead. It reduces boilerplate, smaller memory/CPU cycles, and it makes it less error prone - e.g., decorators required an additional return. + ### Integrating exception handling with Sentry.io When using Sentry.io for error monitoring, you can override `failure_handler` to capture each processing exception with Sentry SDK: diff --git a/examples/batch_processing/src/advanced_accessing_lambda_context.py b/examples/batch_processing/src/advanced_accessing_lambda_context.py new file mode 100644 index 00000000000..96d95ca5445 --- /dev/null +++ b/examples/batch_processing/src/advanced_accessing_lambda_context.py @@ -0,0 +1,30 @@ +import json +from typing import Optional + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_async_batch_processor.py b/examples/batch_processing/src/getting_started_async.py similarity index 79% rename from examples/batch_processing/src/getting_started_async_batch_processor.py rename to examples/batch_processing/src/getting_started_async.py index 594be0540f3..304c01795bb 100644 --- a/examples/batch_processing/src/getting_started_async_batch_processor.py +++ b/examples/batch_processing/src/getting_started_async.py @@ -3,7 +3,7 @@ from aws_lambda_powertools.utilities.batch import ( AsyncBatchProcessor, EventType, - async_batch_processor, + async_process_partial_response, ) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -20,6 +20,7 @@ async def async_record_handler(record: SQSRecord): return ret.status_code -@async_batch_processor(record_handler=async_record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return async_process_partial_response( + event=event, record_handler=async_record_handler, processor=processor, context=context + ) diff --git a/examples/batch_processing/src/getting_started_dynamodb.py b/examples/batch_processing/src/getting_started_dynamodb.py new file mode 100644 index 00000000000..60d8ed89f0e --- /dev/null +++ b/examples/batch_processing/src/getting_started_dynamodb.py @@ -0,0 +1,30 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( + DynamoDBRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) # type: ignore[union-attr] + payload: dict = json.loads(record.dynamodb.new_image.get("Message")) # type: ignore[union-attr,arg-type] + logger.info(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_kinesis.py b/examples/batch_processing/src/getting_started_kinesis.py new file mode 100644 index 00000000000..e58222733e1 --- /dev/null +++ b/examples/batch_processing/src/getting_started_kinesis.py @@ -0,0 +1,28 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + KinesisStreamRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + logger.info(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_sqs.py b/examples/batch_processing/src/getting_started_sqs.py new file mode 100644 index 00000000000..15f8701f297 --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs.py @@ -0,0 +1,29 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_sqs_fifo.py b/examples/batch_processing/src/getting_started_sqs_fifo.py new file mode 100644 index 00000000000..d39f8ba63f1 --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs_fifo.py @@ -0,0 +1,22 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + SqsFifoPartialProcessor, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = SqsFifoPartialProcessor() +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py b/examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py similarity index 100% rename from examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py rename to examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor.py b/examples/batch_processing/src/getting_started_sqs_fifo_decorator.py similarity index 100% rename from examples/batch_processing/src/sqs_fifo_batch_processor.py rename to examples/batch_processing/src/getting_started_sqs_fifo_decorator.py diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 2205d47660c..4ad33b290bd 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -12,7 +12,9 @@ EventType, SqsFifoPartialProcessor, async_batch_processor, + async_process_partial_response, batch_processor, + process_partial_response, ) from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( @@ -775,6 +777,70 @@ def test_async_batch_processor_context_with_failure(sqs_event_factory, async_rec } +def test_process_partial_response(sqs_event_factory, record_handler): + # GIVEN + records = [sqs_event_factory("success"), sqs_event_factory("success")] + batch = {"Records": records} + processor = BatchProcessor(event_type=EventType.SQS) + + # WHEN + ret = process_partial_response(batch, record_handler, processor) + + # THEN + assert ret == {"batchItemFailures": []} + + +@pytest.mark.parametrize( + "batch", + [ + pytest.param(123456789, id="num"), + pytest.param([], id="list"), + pytest.param(False, id="bool"), + pytest.param(object, id="object"), + pytest.param(lambda x: x, id="callable"), + ], +) +def test_process_partial_response_invalid_input(record_handler: Callable, batch: Any): + # GIVEN + processor = BatchProcessor(event_type=EventType.SQS) + + # WHEN/THEN + with pytest.raises(ValueError): + process_partial_response(batch, record_handler, processor) + + +def test_async_process_partial_response(sqs_event_factory, async_record_handler): + # GIVEN + records = [sqs_event_factory("success"), sqs_event_factory("success")] + batch = {"Records": records} + processor = AsyncBatchProcessor(event_type=EventType.SQS) + + # WHEN + ret = async_process_partial_response(batch, async_record_handler, processor) + + # THEN + assert ret == {"batchItemFailures": []} + + +@pytest.mark.parametrize( + "batch", + [ + pytest.param(123456789, id="num"), + pytest.param([], id="list"), + pytest.param(False, id="bool"), + pytest.param(object, id="object"), + pytest.param(lambda x: x, id="callable"), + ], +) +def test_async_process_partial_response_invalid_input(async_record_handler: Callable, batch: Any): + # GIVEN + processor = AsyncBatchProcessor(event_type=EventType.SQS) + + # WHEN/THEN + with pytest.raises(ValueError): + async_process_partial_response(batch, record_handler, processor) + + def test_batch_processor_model_with_partial_validation_error( record_handler_model: Callable, sqs_event_factory, order_event_factory ):