Skip to content

Error: Using Batch processing with a Pydantic models doesn't folllow under the main idea of batch processing #2091

Closed
@nepshshsh

Description

@nepshshsh

Expected Behaviour

Lambda shouldn't failure if there is at least one successful record and return {batchItemFailures: [...] }

in the example the result of the lambda should be like this:
{ "batchItemFailures": [ { "itemIdentifier": "messageId-2" } ] }

Current Behaviour

Code throw exception without processing all batches

Traceback (most recent call last): File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 90, in <module> lambda_handler(event,{}) File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\tracing\tracer.py", line 305, in decorate response = lambda_handler(event, context, **kwargs) File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 42, in lambda_handler processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process() File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in process return [self._process_record(record) for record in self.records] File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in <listcomp> return [self._process_record(record) for record in self.records] File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 474, in _process_record data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 347, in _to_batch_type return model.parse_obj(record) File "pydantic\main.py", line 526, in pydantic.main.BaseModel.parse_obj File "pydantic\main.py", line 341, in pydantic.main.BaseModel.__init__ pydantic.error_wrappers.ValidationError: 1 validation error for OrderSqsRecord body -> item value is not a valid dict (type=type_error.dict)

Code snippet

import json

from typing import Any, List, Literal, Union
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.batch import (BatchProcessor,
                                                   EventType,
                                                   FailureResponse,
                                                   SuccessResponse,
                                                   batch_processor)

from aws_lambda_powertools.utilities.parser import BaseModel, validator


class Order(BaseModel):
    item: dict

class OrderSqsRecord(SqsRecordModel):
    body: Order

    # auto transform json string
    # so Pydantic can auto-initialize nested Order model
    @validator("body", pre=True)
    def transform_body_to_dict(cls, value: str):
        return json.loads(value)

processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: OrderSqsRecord):
    return record.body.item

@tracer.capture_lambda_handler
def lambda_handler(event, context):
    batch = event["Records"]
    with processor(records=batch, handler=record_handler):
        processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()

    for message in processed_messages:
        status: Union[Literal["success"], Literal["fail"]] = message[0]
        result: Any = message[1]
        record: SQSRecord = message[2]

    logger.info(processor.response())
    return processor.response()


if __name__ == "__main__":
    event = {
        "Records": [
            {
            "messageId": "messageId-1",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": '{"item": 1}',
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "1",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
            },
{
            "messageId": "messageId-2",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": 'Hi',
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
            }
        ]
    }
    lambda_handler(event,{})

Possible Solution

I think it can be easily fixed if we add code for bring to the model under try section (https://github.com/awslabs/aws-lambda-powertools-python/blob/develop/aws_lambda_powertools/utilities/batch/base.py#L474)

But perhaps there are some reasons because of which they decided not to do so. Please share your thoughts

Steps to Reproduce

You can try code snippet. also you can try any test case where you have more than two records in one batch, and one record doesn't follow under your model

AWS Lambda Powertools for Python version

latest

AWS Lambda function runtime

3.9

Packaging format used

PyPi

Debugging logs

Traceback (most recent call last):
  File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 90, in <module>
    lambda_handler(event,{})
  File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\tracing\tracer.py", line 305, in decorate
    response = lambda_handler(event, context, **kwargs)
  File "C:\Users\Crumpet\Desktop\AWS\powertools\model_test.py", line 42, in lambda_handler
    processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()
  File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in process
    return [self._process_record(record) for record in self.records]
  File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 97, in <listcomp>
    return [self._process_record(record) for record in self.records]
  File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 474, in _process_record
    data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
  File "C:\Users\Crumpet\AppData\Local\Programs\Python39\lib\site-packages\aws_lambda_powertools\utilities\batch\base.py", line 347, in _to_batch_type
    return model.parse_obj(record)
  File "pydantic\main.py", line 526, in pydantic.main.BaseModel.parse_obj
  File "pydantic\main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for OrderSqsRecord
body -> item
  value is not a valid dict (type=type_error.dict)

Metadata

Metadata

Assignees

Labels

batchBatch processing utilitybugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions