diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a6db9ffdb2..0ddd1529534 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2389,4 +2389,4 @@ [v1.0.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v1.0.0...v1.0.1 [v1.0.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.11.0...v1.0.0 [v0.11.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.1...v0.11.0 -[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1 +[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1 \ No newline at end of file diff --git a/aws_lambda_powertools/shared/functions.py b/aws_lambda_powertools/shared/functions.py index 37621f8274a..e9bc3521125 100644 --- a/aws_lambda_powertools/shared/functions.py +++ b/aws_lambda_powertools/shared/functions.py @@ -1,5 +1,10 @@ +import base64 +import logging +from binascii import Error as BinAsciiError from typing import Optional, Union +logger = logging.getLogger(__name__) + def strtobool(value: str) -> bool: """Convert a string representation of truth to True or False. @@ -58,3 +63,18 @@ def resolve_env_var_choice( resolved choice as either bool or environment value """ return choice if choice is not None else env + + +def base64_decode(value: str) -> bytes: + try: + logger.debug("Decoding base64 Kafka record item before parsing") + return base64.b64decode(value) + except (BinAsciiError, TypeError): + raise ValueError("base64 decode failed") + + +def bytes_to_string(value: bytes) -> str: + try: + return value.decode("utf-8") + except (BinAsciiError, TypeError): + raise ValueError("base64 UTF-8 decode failed") diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index 5e9201dae9f..e52cc5d8dc1 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -85,10 +85,11 @@ def get_header_value( class KafkaEvent(DictWrapper): - """Self-managed Apache Kafka event trigger + """Self-managed or MSK Apache Kafka event trigger Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html + - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ @property @@ -98,7 +99,7 @@ def event_source(self) -> str: @property def event_source_arn(self) -> Optional[str]: - """The AWS service ARN from which the Kafka event record originated.""" + """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @property diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index 7d42fd81ad6..4b0e4c943a2 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -4,6 +4,7 @@ from .cloudwatch import CloudWatchLogsEnvelope from .dynamodb import DynamoDBStreamEnvelope from .event_bridge import EventBridgeEnvelope +from .kafka import KafkaEnvelope from .kinesis import KinesisDataStreamEnvelope from .lambda_function_url import LambdaFunctionUrlEnvelope from .sns import SnsEnvelope, SnsSqsEnvelope @@ -20,5 +21,6 @@ "SnsEnvelope", "SnsSqsEnvelope", "SqsEnvelope", + "KafkaEnvelope", "BaseEnvelope", ] diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kafka.py b/aws_lambda_powertools/utilities/parser/envelopes/kafka.py new file mode 100644 index 00000000000..4e6564c3ec0 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/envelopes/kafka.py @@ -0,0 +1,44 @@ +import logging +from typing import Any, Dict, List, Optional, Type, Union, cast + +from ..models import KafkaMskEventModel, KafkaSelfManagedEventModel +from ..types import Model +from .base import BaseEnvelope + +logger = logging.getLogger(__name__) + + +class KafkaEnvelope(BaseEnvelope): + """Kafka event envelope to extract data within body key + The record's body parameter is a string, though it can also be a JSON encoded string. + Regardless of its type it'll be parsed into a BaseModel object. + + Note: Records will be parsed the same way so if model is str, + all items in the list will be parsed as str and npt as JSON (and vice versa) + """ + + def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: + """Parses data found with model provided + + Parameters + ---------- + data : Dict + Lambda event to be parsed + model : Type[Model] + Data model provided to parse after extracting data using envelope + + Returns + ------- + List + List of records parsed with model provided + """ + event_source = cast(dict, data).get("eventSource") + model_parse_event = KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel + + logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}") + parsed_envelope = model_parse_event.parse_obj(data) + logger.debug(f"Parsing Kafka event records in `value` with {model}") + ret_list = [] + for records in parsed_envelope.records.values(): + ret_list += [self._parse(data=record.value, model=model) for record in records] + return ret_list diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index 11ab6501fa9..6d403019181 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -17,6 +17,7 @@ from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel from .event_bridge import EventBridgeModel +from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload from .lambda_function_url import LambdaFunctionUrlModel from .s3 import S3Model, S3RecordModel @@ -98,4 +99,8 @@ "APIGatewayEventRequestContext", "APIGatewayEventAuthorizer", "APIGatewayEventIdentity", + "KafkaSelfManagedEventModel", + "KafkaRecordModel", + "KafkaMskEventModel", + "KafkaBaseEventModel", ] diff --git a/aws_lambda_powertools/utilities/parser/models/kafka.py b/aws_lambda_powertools/utilities/parser/models/kafka.py new file mode 100644 index 00000000000..d4c36bf70f1 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/kafka.py @@ -0,0 +1,65 @@ +from datetime import datetime +from typing import Dict, List, Type, Union + +from pydantic import BaseModel, validator + +from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string +from aws_lambda_powertools.utilities.parser.types import Literal + +SERVERS_DELIMITER = "," + + +class KafkaRecordModel(BaseModel): + topic: str + partition: int + offset: int + timestamp: datetime + timestampType: str + key: bytes + value: Union[str, Type[BaseModel]] + headers: List[Dict[str, bytes]] + + # validators + _decode_key = validator("key", allow_reuse=True)(base64_decode) + + @validator("value", pre=True, allow_reuse=True) + def data_base64_decode(cls, value): + as_bytes = base64_decode(value) + return bytes_to_string(as_bytes) + + @validator("headers", pre=True, allow_reuse=True) + def decode_headers_list(cls, value): + for header in value: + for key, values in header.items(): + header[key] = bytes(values) + return value + + +class KafkaBaseEventModel(BaseModel): + bootstrapServers: List[str] + records: Dict[str, List[KafkaRecordModel]] + + @validator("bootstrapServers", pre=True, allow_reuse=True) + def split_servers(cls, value): + return None if not value else value.split(SERVERS_DELIMITER) + + +class KafkaSelfManagedEventModel(KafkaBaseEventModel): + """Self-managed Apache Kafka event trigger + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html + """ + + eventSource: Literal["aws:SelfManagedKafka"] + + +class KafkaMskEventModel(KafkaBaseEventModel): + """Fully-managed AWS Apache Kafka event trigger + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html + """ + + eventSource: Literal["aws:kafka"] + eventSourceArn: str diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index 97b005a9fb5..e97395ae56c 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -168,6 +168,8 @@ Parser comes with the following built-in models: | **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway | | **APIGatewayProxyEventV2Model** | Lambda Event Source payload for Amazon API Gateway v2 payload | | **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload | +| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload | +| **KafkaMskEventModel** | Lambda Event Source payload for AWS MSK payload | ### extending built-in models @@ -308,6 +310,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return | **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`.
2. Parses `body` key using your model and returns it. | `Model` | | **ApiGatewayV2Envelope** | 1. Parses data using `APIGatewayProxyEventV2Model`.
2. Parses `body` key using your model and returns it. | `Model` | | **LambdaFunctionUrlEnvelope** | 1. Parses data using `LambdaFunctionUrlModel`.
2. Parses `body` key using your model and returns it. | `Model` | +| **KafkaEnvelope** | 1. Parses data using `KafkaRecordModel`.
2. Parses `value` key using your model and returns it. | `Model` | ### Bringing your own envelope diff --git a/tests/events/kafkaEventSelfManaged.json b/tests/events/kafkaEventSelfManaged.json index 17372b7c243..22985dd11dd 100644 --- a/tests/events/kafkaEventSelfManaged.json +++ b/tests/events/kafkaEventSelfManaged.json @@ -1,5 +1,5 @@ { - "eventSource":"aws:aws:SelfManagedKafka", + "eventSource":"aws:SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py index 79a74f8eb53..b1b66c63379 100644 --- a/tests/functional/parser/schemas.py +++ b/tests/functional/parser/schemas.py @@ -91,3 +91,7 @@ class MyApiGatewayBusiness(BaseModel): class MyALambdaFuncUrlBusiness(BaseModel): message: str username: str + + +class MyLambdaKafkaBusiness(BaseModel): + key: str diff --git a/tests/functional/parser/test_kafka.py b/tests/functional/parser/test_kafka.py new file mode 100644 index 00000000000..f764106add4 --- /dev/null +++ b/tests/functional/parser/test_kafka.py @@ -0,0 +1,93 @@ +from typing import List + +from aws_lambda_powertools.utilities.parser import envelopes, event_parser +from aws_lambda_powertools.utilities.parser.models import ( + KafkaMskEventModel, + KafkaRecordModel, + KafkaSelfManagedEventModel, +) +from aws_lambda_powertools.utilities.typing import LambdaContext +from tests.functional.parser.schemas import MyLambdaKafkaBusiness +from tests.functional.utils import load_event + + +@event_parser(model=MyLambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope) +def handle_lambda_kafka_with_envelope(event: List[MyLambdaKafkaBusiness], _: LambdaContext): + assert event[0].key == "value" + assert len(event) == 1 + + +@event_parser(model=KafkaSelfManagedEventModel) +def handle_kafka_event(event: KafkaSelfManagedEventModel, _: LambdaContext): + return event + + +def test_kafka_msk_event_with_envelope(): + event = load_event("kafkaEventMsk.json") + handle_lambda_kafka_with_envelope(event, LambdaContext()) + + +def test_kafka_self_managed_event_with_envelope(): + event = load_event("kafkaEventSelfManaged.json") + handle_lambda_kafka_with_envelope(event, LambdaContext()) + + +def test_self_managed_kafka_event(): + json_event = load_event("kafkaEventSelfManaged.json") + event: KafkaSelfManagedEventModel = handle_kafka_event(json_event, LambdaContext()) + assert event.eventSource == "aws:SelfManagedKafka" + bootstrap_servers = [ + "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + ] + assert event.bootstrapServers == bootstrap_servers + + records = list(event.records["mytopic-0"]) + assert len(records) == 1 + record: KafkaRecordModel = records[0] + assert record.topic == "mytopic" + assert record.partition == 0 + assert record.offset == 15 + assert record.timestamp is not None + convert_time = int(round(record.timestamp.timestamp() * 1000)) + assert convert_time == 1545084650987 + assert record.timestampType == "CREATE_TIME" + assert record.key == b"recordKey" + assert record.value == '{"key":"value"}' + assert len(record.headers) == 1 + assert record.headers[0]["headerKey"] == b"headerValue" + + +@event_parser(model=KafkaMskEventModel) +def handle_msk_event(event: KafkaMskEventModel, _: LambdaContext): + return event + + +def test_kafka_msk_event(): + json_event = load_event("kafkaEventMsk.json") + event: KafkaMskEventModel = handle_msk_event(json_event, LambdaContext()) + assert event.eventSource == "aws:kafka" + bootstrap_servers = [ + "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + ] + assert event.bootstrapServers == bootstrap_servers + assert ( + event.eventSourceArn + == "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" + ) + + records = list(event.records["mytopic-0"]) + assert len(records) == 1 + record: KafkaRecordModel = records[0] + assert record.topic == "mytopic" + assert record.partition == 0 + assert record.offset == 15 + assert record.timestamp is not None + convert_time = int(round(record.timestamp.timestamp() * 1000)) + assert convert_time == 1545084650987 + assert record.timestampType == "CREATE_TIME" + assert record.key == b"recordKey" + assert record.value == '{"key":"value"}' + assert len(record.headers) == 1 + assert record.headers[0]["headerKey"] == b"headerValue" diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 00dd5100f67..dbef57162e2 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1174,7 +1174,7 @@ def test_kafka_msk_event(): def test_kafka_self_managed_event(): event = KafkaEvent(load_event("kafkaEventSelfManaged.json")) - assert event.event_source == "aws:aws:SelfManagedKafka" + assert event.event_source == "aws:SelfManagedKafka" bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501