Skip to content

RFC: New batch processor for new native partial response (SQS, DynamoDB, Kinesis) #64

Closed
@to-mc

Description

@to-mc

Key information

Summary

A new generic batch processing utility, which can process records from SQS, Kinesis Data Streams, and DynamoDB streams, and handle reporting batch failures.

Motivation

With the launch of support for partial batch responses for Lambda/SQS, the event source mapping can now natively handle partial failures in a batch - removing the need for calls to the delete api. This support already exists for Kinesis and DynamoDB streams.

Proposal

The new utility will be implemented in the namespace which already exists for the SQS Batch Processing utility - aws_lambda_powertools.utilities.batch. Rather than calling the SQS DeleteMessage api like the existing batch utility, this version will instead inject BatchItemFailures (or batchItemFailures for kinesis and ddb) into the Lambda response. We will expose at the least, a new decorator batch_processor, which will accept an event type depending on the integration (Kinesis, SQS or DDB Streams). This will look similar to the Event Handler design. There will be a boolean setting to handle partial failures, defaulting to True (users will still need to enable this in the event source mapping for it to work).

from aws_lambda_powertools.batch import KinesisStreamEvent, EventType


def record_handler(record):
    return do_something_with(record["body"])


@batch_processor(record_handler=record_handler, event_type=EventType.KinesisStreamEvent)
def lambda_handler(event, context):
    return {"statusCode": 200} 

Proposed class/method names for the new api:
aws_lambda_powertools.utilities.batch.base.BatchProcessor
aws_lambda_powertools.utilities.batch.batch_processor

Any configuration or corner cases you'd expect?
Users will need to enable this in the event source mapping when configuring the Lambda trigger - reporting partial failures will not work with only changes to the Lambda code. Investigation is needed to better understand the consequences here, and how expensive it would be to enable a check in the code to see if it is enabled or not. If we don't implement this, we need to call this out front and center in the documentation.

Rationale and alternatives

The main consideration here is where the new functionality fits into the powertools package. It could be a new top level utility eg. aws_lambda_powertools.batchv2 - but we prefer not to add version numbers as it is confusing for users.

We could make a straight replacement of the implementation behind the existing API, which was the initial idea. However, the native functionality requires a setting when the event source mapping is created. That means we'd be introducing a breaking change if we did this - the utility would stop working for users who had not made this configuration.

Unresolved questions

Need to decide if we should add a context manager like the existing implementation has. It is simple to implement this, but it burdens the user with ensuring they store the return value and use it in their lambda response. I feel like this is too likely to be misunderstood, but would like opinions. Example:

from aws_lambda_powertools.utilities.batch import BatchProcessor

def record_handler(record):
    return do_something_with(record["body"])

def lambda_handler(event, context):
    records = event["Records"]

    processor = BatchProcessor(event_type=EventType.KinesisStreamEvent)

    with processor(records, record_handler) as proc:
        result = proc.process()  # Users will have to store the response from processor
  
   return result. # Users will have to return the result they stored from the processor

Metadata

Metadata

Assignees

No one assigned

    Labels

    Pythonall_runtimesChanges that should be applied to all runtimes

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions