@@ -236,12 +236,12 @@ def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallbac
236
236
record: dict
237
237
A batch record to be processed.
238
238
"""
239
+ data = self ._to_batch_type (record = record , event_type = self .event_type , model = self .model )
239
240
try :
240
- data = self ._to_batch_type (record = record , event_type = self .event_type , model = self .model )
241
241
result = self .handler (record = data )
242
242
return self .success_handler (record = record , result = result )
243
243
except Exception :
244
- return self .failure_handler (record = record , exception = sys .exc_info ())
244
+ return self .failure_handler (record = data , exception = sys .exc_info ())
245
245
246
246
def _clean (self ):
247
247
"""
@@ -268,7 +268,7 @@ def _get_messages_to_report(self) -> Dict[str, str]:
268
268
return self ._COLLECTOR_MAPPING [self .event_type ]()
269
269
270
270
def _collect_sqs_failures (self ):
271
- return {"itemIdentifier" : msg .messageId for msg in self .fail_messages }
271
+ return {"itemIdentifier" : msg .message_id for msg in self .fail_messages }
272
272
273
273
def _collect_kinesis_failures (self ):
274
274
return {"itemIdentifier" : msg .kinesis .sequence_number for msg in self .fail_messages }
@@ -285,7 +285,7 @@ def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceData
285
285
...
286
286
287
287
def _to_batch_type (self , record : dict , event_type : EventType , model : Optional ["BatchTypeModels" ] = None ):
288
- if model :
288
+ if model is not None :
289
289
return model .parse_obj (record )
290
290
else :
291
291
return self ._DATA_CLASS_MAPPING [event_type ](record )
0 commit comments