Description
Key information
- RFC PR: feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis powertools-lambda-python#886
- Related issue(s), if known:
- Area: Batch processing
- Meet tenets: Yes
- Approved by: @heitorlessa
- Reviewed by: @heitorlessa
Summary
A new generic batch processing utility, which can process records from SQS, Kinesis Data Streams, and DynamoDB streams, and handle reporting batch failures.
Motivation
With the launch of support for partial batch responses for Lambda/SQS, the event source mapping can now natively handle partial failures in a batch - removing the need for calls to the delete api. This support already exists for Kinesis and DynamoDB streams.
Proposal
The new utility will be implemented in the namespace which already exists for the SQS Batch Processing utility - aws_lambda_powertools.utilities.batch. Rather than calling the SQS DeleteMessage api like the existing batch utility, this version will instead inject BatchItemFailures
(or batchItemFailures
for kinesis and ddb) into the Lambda response. We will expose at the least, a new decorator batch_processor
, which will accept an event type depending on the integration (Kinesis, SQS or DDB Streams). This will look similar to the Event Handler design. There will be a boolean setting to handle partial failures, defaulting to True (users will still need to enable this in the event source mapping for it to work).
from aws_lambda_powertools.batch import KinesisStreamEvent, EventType
def record_handler(record):
return do_something_with(record["body"])
@batch_processor(record_handler=record_handler, event_type=EventType.KinesisStreamEvent)
def lambda_handler(event, context):
return {"statusCode": 200}
Proposed class/method names for the new api:
aws_lambda_powertools.utilities.batch.base.BatchProcessor
aws_lambda_powertools.utilities.batch.batch_processor
Any configuration or corner cases you'd expect?
Users will need to enable this in the event source mapping when configuring the Lambda trigger - reporting partial failures will not work with only changes to the Lambda code. Investigation is needed to better understand the consequences here, and how expensive it would be to enable a check in the code to see if it is enabled or not. If we don't implement this, we need to call this out front and center in the documentation.
Rationale and alternatives
The main consideration here is where the new functionality fits into the powertools package. It could be a new top level utility eg. aws_lambda_powertools.batchv2
- but we prefer not to add version numbers as it is confusing for users.
We could make a straight replacement of the implementation behind the existing API, which was the initial idea. However, the native functionality requires a setting when the event source mapping is created. That means we'd be introducing a breaking change if we did this - the utility would stop working for users who had not made this configuration.
Unresolved questions
Need to decide if we should add a context manager like the existing implementation has. It is simple to implement this, but it burdens the user with ensuring they store the return value and use it in their lambda response. I feel like this is too likely to be misunderstood, but would like opinions. Example:
from aws_lambda_powertools.utilities.batch import BatchProcessor
def record_handler(record):
return do_something_with(record["body"])
def lambda_handler(event, context):
records = event["Records"]
processor = BatchProcessor(event_type=EventType.KinesisStreamEvent)
with processor(records, record_handler) as proc:
result = proc.process() # Users will have to store the response from processor
return result. # Users will have to return the result they stored from the processor