Skip to content

Commit 14cc383

Browse files
committed
docs: fix suggestions made by @heitorlessa
1 parent ded3d75 commit 14cc383

File tree

4 files changed

+75
-39
lines changed

4 files changed

+75
-39
lines changed

aws_lambda_powertools/utilities/batch/middlewares.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
Middlewares for batch utilities
55
"""
6+
67
from typing import Callable, Dict
78

89
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
@@ -49,7 +50,7 @@ def batch_processor(
4950
"""
5051
records = event["Records"]
5152

52-
with processor(records, record_handler) as ctx:
53-
ctx.process()
53+
with processor(records, record_handler):
54+
processor.process()
5455

5556
return handler(event, context)

aws_lambda_powertools/utilities/batch/sqs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class PartialSQSProcessor(BasePartialProcessor):
3737
>>> records = event["Records"]
3838
>>> processor = PartialSQSProcessor()
3939
>>>
40-
>>> with processor(records=records, handler=record_handler) as ctx:
41-
>>> result = ctx.process()
40+
>>> with processor(records=records, handler=record_handler):
41+
>>> result = processor.process()
4242
>>>
4343
>>> # Case a partial failure occurred, all successful executions
4444
>>> # have been deleted from the queue after context's exit.

docs/content/index.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ sam init --location https://github.com/aws-samples/cookiecutter-aws-sam-python
2929
* [Metrics](./core/metrics) - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF)
3030
* [Bring your own middleware](./utilities/middleware_factory) - Decorator factory to create your own middleware to run logic before, and after each Lambda invocation
3131
* [Parameters utility](./utilities/parameters) - Retrieve parameter values from AWS Systems Manager Parameter Store, AWS Secrets Manager, or Amazon DynamoDB, and cache them for a specific amount of time
32-
* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, with a middleware allow custom record handling
32+
* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, handles partial failure.
3333

3434
### Lambda Layer
3535

docs/content/utilities/batch.mdx

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ description: Utility
55

66
import Note from "../../src/components/Note"
77

8-
The batch utility provides an abstraction to process a batch event. Useful for lambda integrations with [AWS SQS](https://aws.amazon.com/sqs/), [AWS Kinesis](https://aws.amazon.com/kinesis/) and [AWS DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).
9-
It also provides base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor.
8+
One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once.
9+
10+
The proposed batch utility aims to provide an abstraction to process batch events, providing base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor.
11+
It also provides a useful implementation to handle partial batch failures from the SQS provider.
1012

1113
**Key Features**
1214

13-
* Run batch processing logic with a clean interface;
14-
* Middleware and context to handle a batch event;
15-
* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure.
15+
* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure;
16+
* Build your own batch processor using the base classes.
1617

1718
**IAM Permissions**
1819

@@ -24,13 +25,32 @@ PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch`
2425

2526
### PartialSQSProcessor
2627

27-
A special batch processor which aims to `clean` your SQS:Queue if one or more (not all) records of the batch fails.
28-
A batch's partial failure sends back all the records to the queue, reprocessing this batch until all records succed.
29-
This processor exists to improve performance in such cases, deleting successful messages of a batch with partial failure.
28+
SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations.
29+
30+
As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the **whole** batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your [configuration][3], until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency.
31+
32+
A *naive* approach to solving this problem is to delete successful records from the queue before redriving's phase. The `PartialSQSProcessor` class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures ocurred during lambda's execution. Two examples are provided below, displaying the behavior of this class.
33+
34+
**Examples:**
35+
36+
```python:title=context_manager.py
37+
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
38+
39+
def record_handler(record):
40+
return record["body"]
41+
42+
def lambda_handler(event, context):
43+
records = event["Records"]
44+
45+
# highlight-start
46+
with processor(records, record_handler):
47+
result = processor.process()
48+
# highlight-end
3049

31-
### Middleware
50+
return result
51+
```
3252

33-
```python:title=app.py
53+
```python:title=middleware.py
3454
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
3555

3656
def record_handler(record):
@@ -48,48 +68,63 @@ def lambda_handler(event, context):
4868
You can create your own batch processor by inheriting the `BaseProcessor` class, and implementing `_prepare()`, `_clean` and `_process_record()`.
4969
It's also possible to inherit the `BasePartialProcessor` which contains additional logic to handle a partial failure and keep track of record status.
5070

51-
Here is an example implementation of a DynamoDBStream custom processor:
71+
**Example:**
5272

5373
```python:title=custom_processor.py
54-
import json
74+
from uuid import uuid4
5575

5676
from aws_lambda_powertools.utilities.batch import BaseProcessor, batch_processor
5777
import boto3
5878

59-
class DynamoDBProcessor(BaseProcessor):
79+
def record_handler(record):
80+
return {"Id": str(uuid4()), "MessageBody": record["body"]}
81+
82+
83+
class DDBStreamProcessor(BaseProcessor):
84+
"""
85+
1. Listens to streams from table A;
86+
2. Process each record;
87+
3. Send a batch message to a Queue with the result.
88+
89+
Parameters
90+
----------
91+
queue_name: str
92+
QueueName to send the results
93+
"""
6094

61-
def __init__(self, queue_url: str):
62-
self.queue_url = queue_url
95+
def __init__(self, queue_name: str):
96+
self.queue_name = queue_name
97+
self.queue_url = None
6398
self.client = boto3.client("sqs")
99+
self.results = []
64100

65101
def _prepare(self):
66-
pass
102+
# It's called once, *before* processing
103+
# Formats queue_url given a name
104+
# E.g.:
105+
self.queue_url = f"https://queue.amazonaws.com/123456789012/{self.queue_name}"
67106

68107
def _clean(self):
69-
pass
108+
# It's called once, *after* closing processing all records (closing the context manager)
109+
# Here we're sending at once all messages to the queue, and cleaning 'results' for future invocations
110+
# E.g.:
111+
self.client.send_message_batch(QueueUrl=self.queue_url, Entries=[self.results])
112+
self.results.clear()
70113

71114
def _process_record(self, record):
72-
"""
73-
Process record and send result to sqs
74-
"""
115+
# It handles how you process your record
116+
# Here we're storing the result of each record in a list
117+
# E.g.:
75118
result = self.handler(record)
76-
body = json.dumps(result)
77-
self.client.send_message(QueueUrl=self.queue_url, MessageBody=body)
119+
self.results.append(result)
78120
return result
79121

80-
def record_handler(record):
81-
return record["Keys"]
82-
83-
# As context
84-
85-
processor = DynamoDBProcessor("dummy")
86-
records = {"Records": []}
87122

88-
with processor(records=records, handler=record_handler) as ctx:
89-
result = ctx.process()
90-
91-
# As middleware
92-
@batch.batch_processor(record_handler=record_handler, processor=DynamoDBProcessor("dummy"))
123+
@batch_processor(record_handler=record_handler, processor=DDBStreamProcessor("dummy-queue"))
93124
def lambda_handler(event, context):
94125
return {"statusCode": 200}
95126
```
127+
128+
[1]: https://aws.amazon.com/eventbridge/integrations/
129+
[2]: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html
130+
[3]: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html

0 commit comments

Comments
 (0)