7
7
import sys
8
8
from abc import ABC , abstractmethod
9
9
from enum import Enum
10
- from typing import Any , Callable , Dict , List , Optional , Tuple
10
+ from types import TracebackType
11
+ from typing import Any , Callable , Dict , List , Optional , Tuple , Type , Union
11
12
12
13
from aws_lambda_powertools .middleware_factory import lambda_handler_decorator
13
14
from aws_lambda_powertools .utilities .data_classes .dynamo_db_stream_event import DynamoDBRecord
14
15
from aws_lambda_powertools .utilities .data_classes .kinesis_stream_event import KinesisStreamRecord
15
16
from aws_lambda_powertools .utilities .data_classes .sqs_event import SQSRecord
16
17
17
18
logger = logging .getLogger (__name__ )
19
+ SuccessCallback = Tuple [str , Any , dict ]
20
+ FailureCallback = Tuple [str , str , dict ]
21
+
22
+ _ExcInfo = Tuple [Type [BaseException ], BaseException , TracebackType ]
23
+ _OptExcInfo = Union [_ExcInfo , Tuple [None , None , None ]]
18
24
19
25
20
26
class EventType (Enum ):
@@ -48,7 +54,7 @@ def _clean(self):
48
54
raise NotImplementedError ()
49
55
50
56
@abstractmethod
51
- def _process_record (self , record : Any ):
57
+ def _process_record (self , record : dict ):
52
58
"""
53
59
Process record with handler.
54
60
"""
@@ -67,13 +73,13 @@ def __enter__(self):
67
73
def __exit__ (self , exception_type , exception_value , traceback ):
68
74
self ._clean ()
69
75
70
- def __call__ (self , records : List [Any ], handler : Callable ):
76
+ def __call__ (self , records : List [dict ], handler : Callable ):
71
77
"""
72
78
Set instance attributes before execution
73
79
74
80
Parameters
75
81
----------
76
- records: List[Any ]
82
+ records: List[dict ]
77
83
List with objects to be processed.
78
84
handler: Callable
79
85
Callable to process "records" entries.
@@ -82,7 +88,7 @@ def __call__(self, records: List[Any], handler: Callable):
82
88
self .handler = handler
83
89
return self
84
90
85
- def success_handler (self , record : Any , result : Any ):
91
+ def success_handler (self , record : dict , result : Any ) -> SuccessCallback :
86
92
"""
87
93
Success callback
88
94
@@ -95,7 +101,7 @@ def success_handler(self, record: Any, result: Any):
95
101
self .success_messages .append (record )
96
102
return entry
97
103
98
- def failure_handler (self , record : Any , exception : Tuple ) :
104
+ def failure_handler (self , record : dict , exception : _OptExcInfo ) -> FailureCallback :
99
105
"""
100
106
Failure callback
101
107
@@ -196,17 +202,17 @@ def _prepare(self):
196
202
self .fail_messages .clear ()
197
203
self .batch_response = self .DEFAULT_RESPONSE
198
204
199
- def _process_record (self , record ) -> Tuple :
205
+ def _process_record (self , record : dict ) -> Union [ SuccessCallback , FailureCallback ] :
200
206
"""
201
207
Process a record with instance's handler
202
208
203
209
Parameters
204
210
----------
205
- record: Any
206
- An object to be processed.
211
+ record: dict
212
+ A batch record to be processed.
207
213
"""
208
214
try :
209
- data = self ._DATA_CLASS_MAPPING [ self .event_type ]( record )
215
+ data = self ._to_batch_type ( record , event_type = self .event_type )
210
216
result = self .handler (record = data )
211
217
return self .success_handler (record = record , result = result )
212
218
except Exception :
@@ -244,3 +250,8 @@ def _collect_kinesis_failures(self):
244
250
245
251
def _collect_dynamodb_failures (self ):
246
252
return {"itemIdentifier" : msg ["dynamodb" ]["SequenceNumber" ] for msg in self .fail_messages }
253
+
254
+ def _to_batch_type (
255
+ self , record : dict , event_type : EventType
256
+ ) -> Union [SQSRecord , KinesisStreamRecord , DynamoDBRecord ]:
257
+ return self ._DATA_CLASS_MAPPING [event_type ](record ) # type: ignore # since DictWrapper inference is incorrect
0 commit comments