diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md
index aa284e7f38b..ca4606e0f40 100644
--- a/docs/utilities/batch.md
+++ b/docs/utilities/batch.md
@@ -5,13 +5,13 @@ description: Utility
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
-**Key Features**
+## Key Features
* Prevent successfully processed messages being returned to SQS
* Simple interface for individually processing messages from a batch
* Build your own batch processor using the base classes
-**Background**
+## Background
When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS.
@@ -25,35 +25,76 @@ are returned to the queue.
More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html)
+## Getting started
-**IAM Permissions**
+### IAM Permissions
-This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission.
+Before your use this utility, your AWS Lambda function must have `sqs:DeleteMessageBatch` permission to delete successful messages directly from the queue.
-## Processing messages from SQS
+> Example using AWS Serverless Application Model (SAM)
-You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager.
+=== "template.yml"
+ ```yaml hl_lines="2-3 12-15"
+ Resources:
+ MyQueue:
+ Type: AWS::SQS::Queue
-They have nearly the same behaviour when it comes to processing messages from the batch:
+ HelloWorldFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Runtime: python3.8
+ Environment:
+ Variables:
+ POWERTOOLS_SERVICE_NAME: example
+ Policies:
+ - SQSPollerPolicy:
+ QueueName:
+ !GetAtt MyQueue.QueueName
+ ```
-* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
-* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will:
- - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
- - **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue
+### Processing messages from SQS
-The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need.
+You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager if you'd like access to the processed results.
-## Record Handler
+You need to create a function to handle each record from the batch - We call it `record_handler` from here on.
-Both decorator and context managers require an explicit function to process the batch of messages - namely `record_handler` parameter.
+=== "Decorator"
-This function is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent.
+ ```python hl_lines="3 6"
+ from aws_lambda_powertools.utilities.batch import sqs_batch_processor
-**Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion.
+ def record_handler(record):
+ return do_something_with(record["body"])
-### sqs_batch_processor decorator
+ @sqs_batch_processor(record_handler=record_handler)
+ def lambda_handler(event, context):
+ return {"statusCode": 200}
+ ```
+=== "Context manager"
-When using this decorator, you need provide a function via `record_handler` param that will process individual messages from the batch - It should raise an exception if it is unable to process the record.
+ ```python hl_lines="3 9 11-12"
+ from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
+
+ def record_handler(record):
+ return_value = do_something_with(record["body"])
+ return return_value
+
+ def lambda_handler(event, context):
+ records = event["Records"]
+ processor = PartialSQSProcessor()
+
+ with processor(records, record_handler) as proc:
+ result = proc.process() # Returns a list of all results from record_handler
+
+ return result
+ ```
+
+!!! tip
+ **Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion.
+
+ If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons.
+
+### Partial failure mechanics
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
@@ -61,29 +102,26 @@ All records in the batch will be passed to this handler for processing, even if
* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue
!!! warning
- You will not have accessed to the processed messages within the Lambda Handler - all processing logic will and should be performed by the record_handler
function.
+ You will not have accessed to the **processed messages** within the Lambda Handler.
-=== "app.py"
+ All processing logic will and should be performed by the `record_handler` function.
- ```python
- from aws_lambda_powertools.utilities.batch import sqs_batch_processor
+## Advanced
- def record_handler(record):
- # This will be called for each individual message from a batch
- # It should raise an exception if the message was not processed successfully
- return_value = do_something_with(record["body"])
- return return_value
+### Choosing between decorator and context manager
- @sqs_batch_processor(record_handler=record_handler)
- def lambda_handler(event, context):
- return {"statusCode": 200}
- ```
+They have nearly the same behaviour when it comes to processing messages from the batch:
-### PartialSQSProcessor context manager
+* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
+* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will:
+ - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
+ - **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue
+
+The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need.
-If you require access to the result of processed messages, you can use this context manager.
+### Accessing processed messages
-The result from calling `process()` on the context manager will be a list of all the return values from your `record_handler` function.
+Use `PartialSQSProcessor` context manager to access a list of all return values from your `record_handler` function.
=== "app.py"
@@ -91,11 +129,7 @@ The result from calling `process()` on the context manager will be a list of all
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
def record_handler(record):
- # This will be called for each individual message from a batch
- # It should raise an exception if the message was not processed successfully
- return_value = do_something_with(record["body"])
- return return_value
-
+ return do_something_with(record["body"])
def lambda_handler(event, context):
records = event["Records"]
@@ -108,7 +142,7 @@ The result from calling `process()` on the context manager will be a list of all
return result
```
-## Passing custom boto3 config
+### Passing custom boto3 config
If you need to pass custom configuration such as region to the SDK, you can pass your own [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) to
the `sqs_batch_processor` decorator:
@@ -159,14 +193,15 @@ the `sqs_batch_processor` decorator:
```
-## Suppressing exceptions
+### Suppressing exceptions
If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument.
=== "Decorator"
- ```python hl_lines="2"
- ...
+ ```python hl_lines="3"
+ from aws_lambda_powertools.utilities.batch import sqs_batch_processor
+
@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True)
def lambda_handler(event, context):
return {"statusCode": 200}
@@ -174,15 +209,16 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r
=== "Context manager"
- ```python hl_lines="2"
- ...
+ ```python hl_lines="3"
+ from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
+
processor = PartialSQSProcessor(config=config, suppress_exception=True)
with processor(records, record_handler):
result = processor.process()
```
-## Create your own partial processor
+### Create your own partial processor
You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`.
@@ -192,11 +228,9 @@ You can create your own partial batch processor by inheriting the `BasePartialPr
You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function.
-**Example:**
-
=== "custom_processor.py"
- ```python
+ ```python hl_lines="3 9 24 30 37 57"
from random import randint
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
@@ -223,14 +257,12 @@ You can then use this class as a context manager, or pass it to `batch_processor
def _prepare(self):
# It's called once, *before* processing
# Creates table resource and clean previous results
- # E.g.:
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
self.success_messages.clear()
def _clean(self):
# It's called once, *after* closing processing all records (closing the context manager)
# Here we're sending, at once, all successful messages to a ddb table
- # E.g.:
with ddb_table.batch_writer() as batch:
for result in self.success_messages:
batch.put_item(Item=result)
@@ -239,7 +271,6 @@ You can then use this class as a context manager, or pass it to `batch_processor
# It handles how your record is processed
# Here we're keeping the status of each run
# where self.handler is the record_handler function passed as an argument
- # E.g.:
try:
result = self.handler(record) # record_handler passed to decorator/context manager
return self.success_handler(record, result)
@@ -260,3 +291,24 @@ You can then use this class as a context manager, or pass it to `batch_processor
def lambda_handler(event, context):
return {"statusCode": 200}
```
+
+### Integrating exception handling with Sentry.io
+
+When using Sentry.io for error monitoring, you can override `failure_handler` to include to capture each processing exception:
+
+> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732)
+
+=== "sentry_integration.py"
+
+ ```python hl_lines="4 7-8"
+ from typing import Tuple
+
+ from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
+ from sentry_sdk import capture_exception
+
+ class SQSProcessor(PartialSQSProcessor):
+ def failure_handler(self, record: Event, exception: Tuple) -> Tuple: # type: ignore
+ capture_exception() # send exception to Sentry
+ logger.exception("got exception while processing SQS message")
+ return super().failure_handler(record, exception) # type: ignore
+ ```