Closed
Description
Expected Behaviour
Code examples for Pydantic integration should be validate an not have missing imports or reference undefined class.
Current Behaviour
All of the code examples have missing imports or reference class that one exist
Kinesis Data Streams
example
DynamoDB Streams
example
Code snippet
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.typing import LambdaContext
class Order(BaseModel):
item: dict
class OrderSqsRecord(SqsRecordModel):
body: Order
# auto transform json string
# so Pydantic can auto-initialize nested Order model
@validator("body", pre=True)
def transform_body_to_dict(cls, value: str):
return json.loads(value)
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: OrderSqsRecord):
return record.body.item
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
Possible Solution
Fix missing imports and validate code examples against a compiler, a fix PR exists here: #1114 and deployed https://gyft.github.io/aws-lambda-powertools-python/latest/utilities/batch/#pydantic-integration
Fixed example
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser import BaseModel, validator
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.typing import LambdaContext
class Order(BaseModel):
item: dict
class OrderSqsRecord(SqsRecordModel):
body: Order
# auto transform json string
# so Pydantic can auto-initialize nested Order model
@validator("body", pre=True)
def transform_body_to_dict(cls, value: str):
return json.loads(value)
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: OrderSqsRecord):
return record.body.item
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
Steps to Reproduce
- Copy any of the Pydantic integration examples
- See that there a missing imports and errors
AWS Lambda Powertools for Python version
latest
AWS Lambda function runtime
3.9
Packaging format used
PyPi
Debugging logs
N/A