Skip to content

Commit 09257ba

Browse files
committed
feat: draft implementation
1 parent 3217097 commit 09257ba

File tree

1 file changed

+27
-5
lines changed
  • aws_lambda_powertools/utilities/batch

1 file changed

+27
-5
lines changed

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,28 @@
88
from abc import ABC, abstractmethod
99
from enum import Enum
1010
from types import TracebackType
11-
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
11+
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload
1212

1313
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
1414
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
1515
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
1616
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
1717

1818
logger = logging.getLogger(__name__)
19+
has_pydantic = "pydantic" in sys.modules
20+
1921
SuccessCallback = Tuple[str, Any, dict]
2022
FailureCallback = Tuple[str, str, dict]
21-
2223
_ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
2324
_OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]]
2425

26+
if has_pydantic:
27+
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel
28+
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel
29+
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
30+
31+
BatchTypeModels = Union[SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecordModel]
32+
2533

2634
class EventType(Enum):
2735
SQS = "SQS"
@@ -167,15 +175,18 @@ def batch_processor(
167175
class BatchProcessor(BasePartialProcessor):
168176
DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []}
169177

170-
def __init__(self, event_type: EventType):
178+
def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
171179
"""Process batch and partially report failed items
172180
173181
Parameters
174182
----------
175183
event_type: EventType
176184
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
185+
model: Optional["BatchTypeModels"]
186+
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
177187
"""
178188
self.event_type = event_type
189+
self.model = model
179190
self.batch_response = self.DEFAULT_RESPONSE
180191
self._COLLECTOR_MAPPING = {
181192
EventType.SQS: self._collect_sqs_failures,
@@ -212,7 +223,7 @@ def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallbac
212223
A batch record to be processed.
213224
"""
214225
try:
215-
data = self._to_batch_type(record, event_type=self.event_type)
226+
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
216227
result = self.handler(record=data)
217228
return self.success_handler(record=record, result=result)
218229
except Exception:
@@ -251,7 +262,18 @@ def _collect_kinesis_failures(self):
251262
def _collect_dynamodb_failures(self):
252263
return {"itemIdentifier": msg["dynamodb"]["SequenceNumber"] for msg in self.fail_messages}
253264

265+
@overload
266+
def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels":
267+
...
268+
269+
@overload
254270
def _to_batch_type(
255271
self, record: dict, event_type: EventType
256272
) -> Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]:
257-
return self._DATA_CLASS_MAPPING[event_type](record) # type: ignore # since DictWrapper inference is incorrect
273+
...
274+
275+
def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None):
276+
if model:
277+
return model.parse_obj(record)
278+
else:
279+
return self._DATA_CLASS_MAPPING[event_type](record)

0 commit comments

Comments
 (0)