-
Notifications
You must be signed in to change notification settings - Fork 429
feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis #886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
b1ca7f1
113a44f
5ab2ec7
4652237
4cfcd34
139f52b
1822456
c757316
3217097
09257ba
251541c
4c95d39
7756d2c
1b242eb
4a7ceea
53b8e75
b0f170e
01eb5a7
77a7ab5
11ab825
0d5d24e
e1dc4cf
cf3b01a
3fc3e40
9ceb74b
be8ab03
27f2937
1496b6f
9057791
580eeae
f380f57
58d78ca
95715d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,24 +3,62 @@ | |
""" | ||
Batch processing utilities | ||
""" | ||
|
||
import logging | ||
import sys | ||
from abc import ABC, abstractmethod | ||
from typing import Any, Callable, Dict, List, Tuple | ||
from enum import Enum | ||
from types import TracebackType | ||
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload | ||
|
||
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator | ||
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord | ||
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class EventType(Enum): | ||
SQS = "SQS" | ||
KinesisDataStreams = "KinesisDataStreams" | ||
DynamoDBStreams = "DynamoDBStreams" | ||
|
||
|
||
# | ||
# type specifics | ||
# | ||
has_pydantic = "pydantic" in sys.modules | ||
_ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType] | ||
_OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]] | ||
|
||
# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses | ||
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation | ||
if has_pydantic: | ||
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel | ||
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel | ||
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel | ||
|
||
BatchTypeModels = Optional[ | ||
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]] | ||
] | ||
|
||
# When using processor with default arguments, records will carry EventSourceDataClassTypes | ||
# and depending on what EventType it's passed it'll correctly map to the right record | ||
# When using Pydantic Models, it'll accept any | ||
EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord] | ||
BatchEventTypes = Union[EventSourceDataClassTypes, "BatchTypeModels"] | ||
SuccessCallback = Tuple[str, Any, BatchEventTypes] | ||
FailureCallback = Tuple[str, str, BatchEventTypes] | ||
|
||
|
||
class BasePartialProcessor(ABC): | ||
""" | ||
Abstract class for batch processors. | ||
""" | ||
|
||
def __init__(self): | ||
self.success_messages: List = [] | ||
self.fail_messages: List = [] | ||
self.success_messages: List[BatchEventTypes] = [] | ||
self.fail_messages: List[BatchEventTypes] = [] | ||
self.exceptions: List = [] | ||
|
||
@abstractmethod | ||
|
@@ -38,7 +76,7 @@ def _clean(self): | |
raise NotImplementedError() | ||
|
||
@abstractmethod | ||
def _process_record(self, record: Any): | ||
def _process_record(self, record: dict): | ||
""" | ||
Process record with handler. | ||
""" | ||
|
@@ -57,13 +95,13 @@ def __enter__(self): | |
def __exit__(self, exception_type, exception_value, traceback): | ||
self._clean() | ||
|
||
def __call__(self, records: List[Any], handler: Callable): | ||
def __call__(self, records: List[dict], handler: Callable): | ||
""" | ||
Set instance attributes before execution | ||
|
||
Parameters | ||
---------- | ||
records: List[Any] | ||
records: List[dict] | ||
List with objects to be processed. | ||
handler: Callable | ||
Callable to process "records" entries. | ||
|
@@ -72,7 +110,7 @@ def __call__(self, records: List[Any], handler: Callable): | |
self.handler = handler | ||
return self | ||
|
||
def success_handler(self, record: Any, result: Any): | ||
def success_handler(self, record, result: Any) -> SuccessCallback: | ||
""" | ||
Success callback | ||
|
||
|
@@ -85,7 +123,7 @@ def success_handler(self, record: Any, result: Any): | |
self.success_messages.append(record) | ||
return entry | ||
|
||
def failure_handler(self, record: Any, exception: Tuple): | ||
def failure_handler(self, record, exception: _OptExcInfo) -> FailureCallback: | ||
""" | ||
Failure callback | ||
|
||
|
@@ -146,3 +184,108 @@ def batch_processor( | |
processor.process() | ||
|
||
return handler(event, context) | ||
|
||
|
||
class BatchProcessor(BasePartialProcessor): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it make sense to have separate processors for each event type (SQS, DynamoDB or Kinesis) instead of growing the complexity of this class? Then you could encapsulate the failure collection in the specific processor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was the initial version we wanted to implemented - a KinesisDataStreamProcessor, DynamoDB... then @cakepietoast argued that this was gonna confuse customers with other available processors (Sqs, PartialProcessor, BaseProcessor), as we can only deprecate them in v2. I'm 50/50 here if I'm honest. I prefer a separate one but I also can see customers easily confused of which one to pick despite docs change I'm gonna make. Implementation wise, this will remain stable. The only two changes I can anticipate is 1/ supporting the new Permanent Exception parameter, and 2/ raising a descriptive exception in case we reach an AttributeError when collecting message id/sequence number from a malformed event/model. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we changed the nomenclature to be “producer” and “consumer” (these processors would be consumers). I had that other idea earlier to make it easier to use the SQS and DynamoDB batch write methods taking into account their batch sizes, those could be “producers” 🤷♂️ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where would the producer sit and what would be its responsibilities? For that suggestion on partitioning, we should add it to the Event Source Data Class as it's a no brainier. I think the word Consumer wouldn't be explicit enough on the capabilities Batch provide - maybe something else then? Features
|
||
DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []} | ||
|
||
def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None): | ||
"""Process batch and partially report failed items | ||
|
||
Parameters | ||
---------- | ||
event_type: EventType | ||
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event | ||
model: Optional["BatchTypeModels"] | ||
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord | ||
""" | ||
self.event_type = event_type | ||
self.model = model | ||
self.batch_response = self.DEFAULT_RESPONSE | ||
heitorlessa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._COLLECTOR_MAPPING = { | ||
EventType.SQS: self._collect_sqs_failures, | ||
EventType.KinesisDataStreams: self._collect_kinesis_failures, | ||
EventType.DynamoDBStreams: self._collect_dynamodb_failures, | ||
} | ||
self._DATA_CLASS_MAPPING = { | ||
EventType.SQS: SQSRecord, | ||
EventType.KinesisDataStreams: KinesisStreamRecord, | ||
EventType.DynamoDBStreams: DynamoDBRecord, | ||
} | ||
|
||
super().__init__() | ||
|
||
def response(self): | ||
"""Batch items that failed processing, if any""" | ||
return self.batch_response | ||
|
||
def _prepare(self): | ||
""" | ||
Remove results from previous execution. | ||
""" | ||
self.success_messages.clear() | ||
self.fail_messages.clear() | ||
self.batch_response = self.DEFAULT_RESPONSE | ||
|
||
def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallback]: | ||
""" | ||
Process a record with instance's handler | ||
|
||
Parameters | ||
---------- | ||
record: dict | ||
A batch record to be processed. | ||
""" | ||
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) | ||
try: | ||
result = self.handler(record=data) | ||
return self.success_handler(record=record, result=result) | ||
except Exception: | ||
return self.failure_handler(record=data, exception=sys.exc_info()) | ||
|
||
def _clean(self): | ||
""" | ||
Report messages to be deleted in case of partial failure. | ||
""" | ||
|
||
if not self._has_messages_to_report(): | ||
return | ||
|
||
messages = self._get_messages_to_report() | ||
self.batch_response = {"batchItemFailures": [messages]} | ||
|
||
def _has_messages_to_report(self) -> bool: | ||
if self.fail_messages: | ||
return True | ||
|
||
logger.debug(f"All {len(self.success_messages)} records successfully processed") | ||
return False | ||
|
||
def _get_messages_to_report(self) -> Dict[str, str]: | ||
""" | ||
Format messages to use in batch deletion | ||
""" | ||
return self._COLLECTOR_MAPPING[self.event_type]() | ||
|
||
def _collect_sqs_failures(self): | ||
return {"itemIdentifier": msg.message_id for msg in self.fail_messages} | ||
|
||
def _collect_kinesis_failures(self): | ||
return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages} | ||
|
||
def _collect_dynamodb_failures(self): | ||
return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages} | ||
|
||
@overload | ||
def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels": | ||
... | ||
|
||
@overload | ||
def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes: | ||
... | ||
|
||
def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None): | ||
if model is not None: | ||
return model.parse_obj(record) | ||
else: | ||
return self._DATA_CLASS_MAPPING[event_type](record) |
Uh oh!
There was an error while loading. Please reload this page.