16
16
from aws_lambda_powertools .utilities .data_classes .sqs_event import SQSRecord
17
17
18
18
logger = logging .getLogger (__name__ )
19
- has_pydantic = "pydantic" in sys .modules
20
19
21
- SuccessCallback = Tuple [str , Any , dict ]
22
- FailureCallback = Tuple [str , str , dict ]
20
+
21
+ class EventType (Enum ):
22
+ SQS = "SQS"
23
+ KinesisDataStreams = "KinesisDataStreams"
24
+ DynamoDBStreams = "DynamoDBStreams"
25
+
26
+
27
+ #
28
+ # type specifics
29
+ #
30
+ has_pydantic = "pydantic" in sys .modules
23
31
_ExcInfo = Tuple [Type [BaseException ], BaseException , TracebackType ]
24
32
_OptExcInfo = Union [_ExcInfo , Tuple [None , None , None ]]
25
33
34
+ # For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses
35
+ # We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
26
36
if has_pydantic :
27
37
from aws_lambda_powertools .utilities .parser .models import DynamoDBStreamRecordModel
28
38
from aws_lambda_powertools .utilities .parser .models import KinesisDataStreamRecord as KinesisDataStreamRecordModel
32
42
Union [Type [SqsRecordModel ], Type [DynamoDBStreamRecordModel ], Type [KinesisDataStreamRecordModel ]]
33
43
]
34
44
35
-
36
- class EventType (Enum ):
37
- SQS = "SQS"
38
- KinesisDataStreams = "KinesisDataStreams"
39
- DynamoDBStreams = "DynamoDBStreams"
45
+ # When using processor with default arguments, records will carry EventSourceDataClassTypes
46
+ # and depending on what EventType it's passed it'll correctly map to the right record
47
+ # When using Pydantic Models, it'll accept any
48
+ EventSourceDataClassTypes = Union [SQSRecord , KinesisStreamRecord , DynamoDBRecord ]
49
+ BatchEventTypes = Union [EventSourceDataClassTypes , "BatchTypeModels" ]
50
+ SuccessCallback = Tuple [str , Any , BatchEventTypes ]
51
+ FailureCallback = Tuple [str , str , BatchEventTypes ]
40
52
41
53
42
54
class BasePartialProcessor (ABC ):
@@ -45,8 +57,8 @@ class BasePartialProcessor(ABC):
45
57
"""
46
58
47
59
def __init__ (self ):
48
- self .success_messages : List = []
49
- self .fail_messages : List = []
60
+ self .success_messages : List [ BatchEventTypes ] = []
61
+ self .fail_messages : List [ BatchEventTypes ] = []
50
62
self .exceptions : List = []
51
63
52
64
@abstractmethod
@@ -98,7 +110,7 @@ def __call__(self, records: List[dict], handler: Callable):
98
110
self .handler = handler
99
111
return self
100
112
101
- def success_handler (self , record : dict , result : Any ) -> SuccessCallback :
113
+ def success_handler (self , record , result : Any ) -> SuccessCallback :
102
114
"""
103
115
Success callback
104
116
@@ -111,7 +123,7 @@ def success_handler(self, record: dict, result: Any) -> SuccessCallback:
111
123
self .success_messages .append (record )
112
124
return entry
113
125
114
- def failure_handler (self , record : dict , exception : _OptExcInfo ) -> FailureCallback :
126
+ def failure_handler (self , record , exception : _OptExcInfo ) -> FailureCallback :
115
127
"""
116
128
Failure callback
117
129
@@ -256,22 +268,20 @@ def _get_messages_to_report(self) -> Dict[str, str]:
256
268
return self ._COLLECTOR_MAPPING [self .event_type ]()
257
269
258
270
def _collect_sqs_failures (self ):
259
- return {"itemIdentifier" : msg [ " messageId" ] for msg in self .fail_messages }
271
+ return {"itemIdentifier" : msg . messageId for msg in self .fail_messages }
260
272
261
273
def _collect_kinesis_failures (self ):
262
- return {"itemIdentifier" : msg [ " kinesis" ][ "sequenceNumber" ] for msg in self .fail_messages }
274
+ return {"itemIdentifier" : msg . kinesis . sequence_number for msg in self .fail_messages }
263
275
264
276
def _collect_dynamodb_failures (self ):
265
- return {"itemIdentifier" : msg [ " dynamodb" ][ "SequenceNumber" ] for msg in self .fail_messages }
277
+ return {"itemIdentifier" : msg . dynamodb . sequence_number for msg in self .fail_messages }
266
278
267
279
@overload
268
280
def _to_batch_type (self , record : dict , event_type : EventType , model : "BatchTypeModels" ) -> "BatchTypeModels" :
269
281
...
270
282
271
283
@overload
272
- def _to_batch_type (
273
- self , record : dict , event_type : EventType
274
- ) -> Union [SQSRecord , KinesisStreamRecord , DynamoDBRecord ]:
284
+ def _to_batch_type (self , record : dict , event_type : EventType ) -> EventSourceDataClassTypes :
275
285
...
276
286
277
287
def _to_batch_type (self , record : dict , event_type : EventType , model : Optional ["BatchTypeModels" ] = None ):
0 commit comments