diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2a6db9ffdb2..0ddd1529534 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2389,4 +2389,4 @@
[v1.0.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v1.0.0...v1.0.1
[v1.0.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.11.0...v1.0.0
[v0.11.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.1...v0.11.0
-[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1
+[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1
\ No newline at end of file
diff --git a/aws_lambda_powertools/shared/functions.py b/aws_lambda_powertools/shared/functions.py
index 37621f8274a..e9bc3521125 100644
--- a/aws_lambda_powertools/shared/functions.py
+++ b/aws_lambda_powertools/shared/functions.py
@@ -1,5 +1,10 @@
+import base64
+import logging
+from binascii import Error as BinAsciiError
from typing import Optional, Union
+logger = logging.getLogger(__name__)
+
def strtobool(value: str) -> bool:
"""Convert a string representation of truth to True or False.
@@ -58,3 +63,18 @@ def resolve_env_var_choice(
resolved choice as either bool or environment value
"""
return choice if choice is not None else env
+
+
+def base64_decode(value: str) -> bytes:
+ try:
+ logger.debug("Decoding base64 Kafka record item before parsing")
+ return base64.b64decode(value)
+ except (BinAsciiError, TypeError):
+ raise ValueError("base64 decode failed")
+
+
+def bytes_to_string(value: bytes) -> str:
+ try:
+ return value.decode("utf-8")
+ except (BinAsciiError, TypeError):
+ raise ValueError("base64 UTF-8 decode failed")
diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py
index 5e9201dae9f..e52cc5d8dc1 100644
--- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py
+++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py
@@ -85,10 +85,11 @@ def get_header_value(
class KafkaEvent(DictWrapper):
- """Self-managed Apache Kafka event trigger
+ """Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
+ - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""
@property
@@ -98,7 +99,7 @@ def event_source(self) -> str:
@property
def event_source_arn(self) -> Optional[str]:
- """The AWS service ARN from which the Kafka event record originated."""
+ """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
return self.get("eventSourceArn")
@property
diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
index 7d42fd81ad6..4b0e4c943a2 100644
--- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
+++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
@@ -4,6 +4,7 @@
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
+from .kafka import KafkaEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .lambda_function_url import LambdaFunctionUrlEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
@@ -20,5 +21,6 @@
"SnsEnvelope",
"SnsSqsEnvelope",
"SqsEnvelope",
+ "KafkaEnvelope",
"BaseEnvelope",
]
diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kafka.py b/aws_lambda_powertools/utilities/parser/envelopes/kafka.py
new file mode 100644
index 00000000000..4e6564c3ec0
--- /dev/null
+++ b/aws_lambda_powertools/utilities/parser/envelopes/kafka.py
@@ -0,0 +1,44 @@
+import logging
+from typing import Any, Dict, List, Optional, Type, Union, cast
+
+from ..models import KafkaMskEventModel, KafkaSelfManagedEventModel
+from ..types import Model
+from .base import BaseEnvelope
+
+logger = logging.getLogger(__name__)
+
+
+class KafkaEnvelope(BaseEnvelope):
+ """Kafka event envelope to extract data within body key
+ The record's body parameter is a string, though it can also be a JSON encoded string.
+ Regardless of its type it'll be parsed into a BaseModel object.
+
+ Note: Records will be parsed the same way so if model is str,
+ all items in the list will be parsed as str and npt as JSON (and vice versa)
+ """
+
+ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
+ """Parses data found with model provided
+
+ Parameters
+ ----------
+ data : Dict
+ Lambda event to be parsed
+ model : Type[Model]
+ Data model provided to parse after extracting data using envelope
+
+ Returns
+ -------
+ List
+ List of records parsed with model provided
+ """
+ event_source = cast(dict, data).get("eventSource")
+ model_parse_event = KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
+
+ logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
+ parsed_envelope = model_parse_event.parse_obj(data)
+ logger.debug(f"Parsing Kafka event records in `value` with {model}")
+ ret_list = []
+ for records in parsed_envelope.records.values():
+ ret_list += [self._parse(data=record.value, model=model) for record in records]
+ return ret_list
diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py
index 11ab6501fa9..6d403019181 100644
--- a/aws_lambda_powertools/utilities/parser/models/__init__.py
+++ b/aws_lambda_powertools/utilities/parser/models/__init__.py
@@ -17,6 +17,7 @@
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
+from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
from .lambda_function_url import LambdaFunctionUrlModel
from .s3 import S3Model, S3RecordModel
@@ -98,4 +99,8 @@
"APIGatewayEventRequestContext",
"APIGatewayEventAuthorizer",
"APIGatewayEventIdentity",
+ "KafkaSelfManagedEventModel",
+ "KafkaRecordModel",
+ "KafkaMskEventModel",
+ "KafkaBaseEventModel",
]
diff --git a/aws_lambda_powertools/utilities/parser/models/kafka.py b/aws_lambda_powertools/utilities/parser/models/kafka.py
new file mode 100644
index 00000000000..d4c36bf70f1
--- /dev/null
+++ b/aws_lambda_powertools/utilities/parser/models/kafka.py
@@ -0,0 +1,65 @@
+from datetime import datetime
+from typing import Dict, List, Type, Union
+
+from pydantic import BaseModel, validator
+
+from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string
+from aws_lambda_powertools.utilities.parser.types import Literal
+
+SERVERS_DELIMITER = ","
+
+
+class KafkaRecordModel(BaseModel):
+ topic: str
+ partition: int
+ offset: int
+ timestamp: datetime
+ timestampType: str
+ key: bytes
+ value: Union[str, Type[BaseModel]]
+ headers: List[Dict[str, bytes]]
+
+ # validators
+ _decode_key = validator("key", allow_reuse=True)(base64_decode)
+
+ @validator("value", pre=True, allow_reuse=True)
+ def data_base64_decode(cls, value):
+ as_bytes = base64_decode(value)
+ return bytes_to_string(as_bytes)
+
+ @validator("headers", pre=True, allow_reuse=True)
+ def decode_headers_list(cls, value):
+ for header in value:
+ for key, values in header.items():
+ header[key] = bytes(values)
+ return value
+
+
+class KafkaBaseEventModel(BaseModel):
+ bootstrapServers: List[str]
+ records: Dict[str, List[KafkaRecordModel]]
+
+ @validator("bootstrapServers", pre=True, allow_reuse=True)
+ def split_servers(cls, value):
+ return None if not value else value.split(SERVERS_DELIMITER)
+
+
+class KafkaSelfManagedEventModel(KafkaBaseEventModel):
+ """Self-managed Apache Kafka event trigger
+ Documentation:
+ --------------
+ - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
+ """
+
+ eventSource: Literal["aws:SelfManagedKafka"]
+
+
+class KafkaMskEventModel(KafkaBaseEventModel):
+ """Fully-managed AWS Apache Kafka event trigger
+ Documentation:
+ --------------
+ - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
+ """
+
+ eventSource: Literal["aws:kafka"]
+ eventSourceArn: str
diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md
index 97b005a9fb5..e97395ae56c 100644
--- a/docs/utilities/parser.md
+++ b/docs/utilities/parser.md
@@ -168,6 +168,8 @@ Parser comes with the following built-in models:
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
| **APIGatewayProxyEventV2Model** | Lambda Event Source payload for Amazon API Gateway v2 payload |
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
+| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload |
+| **KafkaMskEventModel** | Lambda Event Source payload for AWS MSK payload |
### extending built-in models
@@ -308,6 +310,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`.
2. Parses `body` key using your model and returns it. | `Model` |
| **ApiGatewayV2Envelope** | 1. Parses data using `APIGatewayProxyEventV2Model`.
2. Parses `body` key using your model and returns it. | `Model` |
| **LambdaFunctionUrlEnvelope** | 1. Parses data using `LambdaFunctionUrlModel`.
2. Parses `body` key using your model and returns it. | `Model` |
+| **KafkaEnvelope** | 1. Parses data using `KafkaRecordModel`.
2. Parses `value` key using your model and returns it. | `Model` |
### Bringing your own envelope
diff --git a/tests/events/kafkaEventSelfManaged.json b/tests/events/kafkaEventSelfManaged.json
index 17372b7c243..22985dd11dd 100644
--- a/tests/events/kafkaEventSelfManaged.json
+++ b/tests/events/kafkaEventSelfManaged.json
@@ -1,5 +1,5 @@
{
- "eventSource":"aws:aws:SelfManagedKafka",
+ "eventSource":"aws:SelfManagedKafka",
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records":{
"mytopic-0":[
diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py
index 79a74f8eb53..b1b66c63379 100644
--- a/tests/functional/parser/schemas.py
+++ b/tests/functional/parser/schemas.py
@@ -91,3 +91,7 @@ class MyApiGatewayBusiness(BaseModel):
class MyALambdaFuncUrlBusiness(BaseModel):
message: str
username: str
+
+
+class MyLambdaKafkaBusiness(BaseModel):
+ key: str
diff --git a/tests/functional/parser/test_kafka.py b/tests/functional/parser/test_kafka.py
new file mode 100644
index 00000000000..f764106add4
--- /dev/null
+++ b/tests/functional/parser/test_kafka.py
@@ -0,0 +1,93 @@
+from typing import List
+
+from aws_lambda_powertools.utilities.parser import envelopes, event_parser
+from aws_lambda_powertools.utilities.parser.models import (
+ KafkaMskEventModel,
+ KafkaRecordModel,
+ KafkaSelfManagedEventModel,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+from tests.functional.parser.schemas import MyLambdaKafkaBusiness
+from tests.functional.utils import load_event
+
+
+@event_parser(model=MyLambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope)
+def handle_lambda_kafka_with_envelope(event: List[MyLambdaKafkaBusiness], _: LambdaContext):
+ assert event[0].key == "value"
+ assert len(event) == 1
+
+
+@event_parser(model=KafkaSelfManagedEventModel)
+def handle_kafka_event(event: KafkaSelfManagedEventModel, _: LambdaContext):
+ return event
+
+
+def test_kafka_msk_event_with_envelope():
+ event = load_event("kafkaEventMsk.json")
+ handle_lambda_kafka_with_envelope(event, LambdaContext())
+
+
+def test_kafka_self_managed_event_with_envelope():
+ event = load_event("kafkaEventSelfManaged.json")
+ handle_lambda_kafka_with_envelope(event, LambdaContext())
+
+
+def test_self_managed_kafka_event():
+ json_event = load_event("kafkaEventSelfManaged.json")
+ event: KafkaSelfManagedEventModel = handle_kafka_event(json_event, LambdaContext())
+ assert event.eventSource == "aws:SelfManagedKafka"
+ bootstrap_servers = [
+ "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ ]
+ assert event.bootstrapServers == bootstrap_servers
+
+ records = list(event.records["mytopic-0"])
+ assert len(records) == 1
+ record: KafkaRecordModel = records[0]
+ assert record.topic == "mytopic"
+ assert record.partition == 0
+ assert record.offset == 15
+ assert record.timestamp is not None
+ convert_time = int(round(record.timestamp.timestamp() * 1000))
+ assert convert_time == 1545084650987
+ assert record.timestampType == "CREATE_TIME"
+ assert record.key == b"recordKey"
+ assert record.value == '{"key":"value"}'
+ assert len(record.headers) == 1
+ assert record.headers[0]["headerKey"] == b"headerValue"
+
+
+@event_parser(model=KafkaMskEventModel)
+def handle_msk_event(event: KafkaMskEventModel, _: LambdaContext):
+ return event
+
+
+def test_kafka_msk_event():
+ json_event = load_event("kafkaEventMsk.json")
+ event: KafkaMskEventModel = handle_msk_event(json_event, LambdaContext())
+ assert event.eventSource == "aws:kafka"
+ bootstrap_servers = [
+ "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ ]
+ assert event.bootstrapServers == bootstrap_servers
+ assert (
+ event.eventSourceArn
+ == "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"
+ )
+
+ records = list(event.records["mytopic-0"])
+ assert len(records) == 1
+ record: KafkaRecordModel = records[0]
+ assert record.topic == "mytopic"
+ assert record.partition == 0
+ assert record.offset == 15
+ assert record.timestamp is not None
+ convert_time = int(round(record.timestamp.timestamp() * 1000))
+ assert convert_time == 1545084650987
+ assert record.timestampType == "CREATE_TIME"
+ assert record.key == b"recordKey"
+ assert record.value == '{"key":"value"}'
+ assert len(record.headers) == 1
+ assert record.headers[0]["headerKey"] == b"headerValue"
diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py
index 00dd5100f67..dbef57162e2 100644
--- a/tests/functional/test_data_classes.py
+++ b/tests/functional/test_data_classes.py
@@ -1174,7 +1174,7 @@ def test_kafka_msk_event():
def test_kafka_self_managed_event():
event = KafkaEvent(load_event("kafkaEventSelfManaged.json"))
- assert event.event_source == "aws:aws:SelfManagedKafka"
+ assert event.event_source == "aws:SelfManagedKafka"
bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501