Skip to content

feat(batch): reduce boilerplate with process_partial_response #2090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
EventType,
FailureResponse,
SuccessResponse,
)
from aws_lambda_powertools.utilities.batch.decorators import (
async_batch_processor,
async_process_partial_response,
batch_processor,
process_partial_response,
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
Expand All @@ -22,6 +26,10 @@
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels

__all__ = (
"async_batch_processor",
"async_process_partial_response",
"batch_processor",
"process_partial_response",
"BatchProcessor",
"AsyncBatchProcessor",
"BasePartialProcessor",
Expand All @@ -32,6 +40,4 @@
"FailureResponse",
"SuccessResponse",
"SqsFifoPartialProcessor",
"batch_processor",
"async_batch_processor",
)
107 changes: 1 addition & 106 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,8 @@
import sys
from abc import ABC, abstractmethod
from enum import Enum
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
Union,
overload,
)
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.utilities.batch.exceptions import (
BatchProcessingError,
Expand Down Expand Up @@ -513,51 +502,6 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons
return self.failure_handler(record=data, exception=sys.exc_info())


@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor
):
"""
Middleware to handle batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: LambdaContext
Lambda's Context
record_handler: Callable
Callable or corutine to process each record from the batch
processor: BatchProcessor
Batch Processor to handle partial failure cases

Examples
--------
**Processes Lambda's event with a BasePartialProcessor**

>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors. Use `async_batch_processor` instead.
"""
records = event["Records"]

with processor(records, record_handler, lambda_context=context):
processor.process()

return handler(event, context)


class AsyncBatchProcessor(BasePartialBatchProcessor):
"""Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.

Expand Down Expand Up @@ -694,52 +638,3 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa
return self._register_model_validation_error_record(record)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())


@lambda_handler_decorator
def async_batch_processor(
handler: Callable,
event: Dict,
context: LambdaContext,
record_handler: Callable[..., Awaitable[Any]],
processor: AsyncBatchProcessor,
):
"""
Middleware to handle batch event processing
Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: LambdaContext
Lambda's Context
record_handler: Callable[..., Awaitable[Any]]
Callable to process each record from the batch
processor: AsyncBatchProcessor
Batch Processor to handle partial failure cases
Examples
--------
**Processes Lambda's event with a BasePartialProcessor**
>>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
>>>
>>> async def async_record_handler(record):
>>> payload: str = record.body
>>> return payload
>>>
>>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
>>>
>>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
>>> async def lambda_handler(event, context: LambdaContext):
>>> return processor.response()

Limitations
-----------
* Sync batch processors. Use `batch_processor` instead.
"""
records = event["Records"]

with processor(records, record_handler, lambda_context=context):
processor.async_process()

return handler(event, context)
Loading