Description
Expected Behaviour
Lambda shouldn't failure if there is at least one successful record and return {batchItemFailures: [...] }
in the example the result of the lambda should be like this:
{ "batchItemFailures": [ { "itemIdentifier": "messageId-2" } ] }
Current Behaviour
Code throw exception without processing all batches
Traceback (most recent call last): File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 90, in <module> lambda_handler(event,{}) File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\tracing\tracer.py", line 305, in decorate response = lambda_handler(event, context, **kwargs) File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 42, in lambda_handler processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process() File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in process return [self._process_record(record) for record in self.records] File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in <listcomp> return [self._process_record(record) for record in self.records] File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 474, in _process_record data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 347, in _to_batch_type return model.parse_obj(record) File "pydantic\main.py", line 526, in pydantic.main.BaseModel.parse_obj File "pydantic\main.py", line 341, in pydantic.main.BaseModel.__init__ pydantic.error_wrappers.ValidationError: 1 validation error for OrderSqsRecord body -> item value is not a valid dict (type=type_error.dict)
Code snippet
import json
from typing import Any, List, Literal, Union
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.batch import (BatchProcessor,
EventType,
FailureResponse,
SuccessResponse,
batch_processor)
from aws_lambda_powertools.utilities.parser import BaseModel, validator
class Order(BaseModel):
item: dict
class OrderSqsRecord(SqsRecordModel):
body: Order
# auto transform json string
# so Pydantic can auto-initialize nested Order model
@validator("body", pre=True)
def transform_body_to_dict(cls, value: str):
return json.loads(value)
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: OrderSqsRecord):
return record.body.item
@tracer.capture_lambda_handler
def lambda_handler(event, context):
batch = event["Records"]
with processor(records=batch, handler=record_handler):
processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()
for message in processed_messages:
status: Union[Literal["success"], Literal["fail"]] = message[0]
result: Any = message[1]
record: SQSRecord = message[2]
logger.info(processor.response())
return processor.response()
if __name__ == "__main__":
event = {
"Records": [
{
"messageId": "messageId-1",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": '{"item": 1}',
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "1",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
},
{
"messageId": "messageId-2",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": 'Hi',
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}
lambda_handler(event,{})
Possible Solution
I think it can be easily fixed if we add code for bring to the model under try section (https://github.com/awslabs/aws-lambda-powertools-python/blob/develop/aws_lambda_powertools/utilities/batch/base.py#L474)
But perhaps there are some reasons because of which they decided not to do so. Please share your thoughts
Steps to Reproduce
You can try code snippet. also you can try any test case where you have more than two records in one batch, and one record doesn't follow under your model
AWS Lambda Powertools for Python version
latest
AWS Lambda function runtime
3.9
Packaging format used
PyPi
Debugging logs
Traceback (most recent call last):
File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 90, in <module>
lambda_handler(event,{})
File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\tracing\tracer.py", line 305, in decorate
response = lambda_handler(event, context, **kwargs)
File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 42, in lambda_handler
processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()
File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in process
return [self._process_record(record) for record in self.records]
File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in <listcomp>
return [self._process_record(record) for record in self.records]
File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 474, in _process_record
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 347, in _to_batch_type
return model.parse_obj(record)
File "pydantic\main.py", line 526, in pydantic.main.BaseModel.parse_obj
File "pydantic\main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for OrderSqsRecord
body -> item
value is not a valid dict (type=type_error.dict)