-
Notifications
You must be signed in to change notification settings - Fork 430
feat(parser): add KafkaMskEventModel and KafkaSelfManagedEventModel #1499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
fef70b0
d918387
13558d9
40ce509
5951ab0
6b23903
6380b63
8e24ae3
af99733
6c405fa
f2a6c9b
70de885
574a389
a0f778c
c0df797
a918a94
06c94c9
61d833c
4e4282c
9ee356a
afbd394
139ffd7
bfe0803
50571b7
75e84bd
86ef6de
f41c82c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import logging | ||
from typing import Any, Dict, List, Optional, Type, Union | ||
|
||
from ..models import KafkaEventModel | ||
from ..types import Model | ||
from .base import BaseEnvelope | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KafkaEnvelope(BaseEnvelope): | ||
"""Kafka event envelope to extract data within body key | ||
The record's body parameter is a string, though it can also be a JSON encoded string. | ||
Regardless of its type it'll be parsed into a BaseModel object. | ||
|
||
Note: Records will be parsed the same way so if model is str, | ||
all items in the list will be parsed as str and npt as JSON (and vice versa) | ||
""" | ||
|
||
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: | ||
"""Parses data found with model provided | ||
|
||
Parameters | ||
---------- | ||
data : Dict | ||
Lambda event to be parsed | ||
model : Type[Model] | ||
Data model provided to parse after extracting data using envelope | ||
|
||
Returns | ||
------- | ||
List | ||
List of records parsed with model provided | ||
""" | ||
logger.debug(f"Parsing incoming data with Kafka event model {KafkaEventModel}") | ||
parsed_envelope: KafkaEventModel = KafkaEventModel.parse_obj(data) | ||
logger.debug(f"Parsing Kafka event records in `value` with {model}") | ||
ret_list = [] | ||
for records in parsed_envelope.records.values(): | ||
ret_list += [self._parse(data=record.value, model=model) for record in records] | ||
return ret_list |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import base64 | ||
import logging | ||
from binascii import Error as BinAsciiError | ||
from datetime import datetime | ||
from typing import Dict, List, Optional, Type, Union | ||
|
||
from pydantic import BaseModel, validator | ||
|
||
from aws_lambda_powertools.utilities.parser.types import Literal | ||
|
||
SERVERS_DELIMITER = "," | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def _base64_decode(value: str) -> bytes: | ||
try: | ||
logger.debug("Decoding base64 Kafka record item before parsing") | ||
return base64.b64decode(value) | ||
except (BinAsciiError, TypeError): | ||
raise ValueError("base64 decode failed") | ||
|
||
|
||
def _bytes_to_string(value: bytes) -> str: | ||
try: | ||
return value.decode("utf-8") | ||
except (BinAsciiError, TypeError): | ||
raise ValueError("base64 UTF-8 decode failed") | ||
|
||
|
||
class KafkaRecordModel(BaseModel): | ||
topic: str | ||
partition: int | ||
offset: int | ||
timestamp: datetime | ||
timestampType: str | ||
key: bytes | ||
value: Union[str, Type[BaseModel]] | ||
headers: List[Dict[str, bytes]] | ||
|
||
# validators | ||
_decode_key = validator("key", allow_reuse=True)(_base64_decode) | ||
|
||
@validator("value", pre=True, allow_reuse=True) | ||
def data_base64_decode(cls, value): | ||
as_bytes = _base64_decode(value) | ||
return _bytes_to_string(as_bytes) | ||
|
||
@validator("headers", pre=True, allow_reuse=True) | ||
def decode_headers_list(cls, value): | ||
for header in value: | ||
for key, values in header.items(): | ||
header[key] = bytes(values) | ||
return value | ||
|
||
|
||
class KafkaBaseEventModel(BaseModel): | ||
bootstrapServers: Optional[List[str]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This field is not optional. So, we must remove the I've notified the documentation team to include it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. np, will fix it |
||
records: Dict[str, List[KafkaRecordModel]] | ||
|
||
@validator("bootstrapServers", pre=True, allow_reuse=True) | ||
def split_servers(cls, value): | ||
return None if not value else value.split(SERVERS_DELIMITER) | ||
|
||
|
||
class KafkaEventModel(KafkaBaseEventModel): | ||
"""Self-managed Apache Kafka event trigger | ||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html | ||
""" | ||
|
||
eventSource: Literal["aws:SelfManagedKafka"] | ||
|
||
|
||
class MskEventModel(KafkaBaseEventModel): | ||
"""Fully-managed AWS Apache Kafka event trigger | ||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html | ||
""" | ||
|
||
eventSource: Literal["aws:kafka"] | ||
eventSourceArn: str |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from typing import List | ||
|
||
from aws_lambda_powertools.utilities.parser import envelopes, event_parser | ||
from aws_lambda_powertools.utilities.parser.models import KafkaEventModel, KafkaRecordModel, MskEventModel | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
from tests.functional.parser.schemas import MyALambdaKafkaBusiness | ||
ran-isenberg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from tests.functional.utils import load_event | ||
|
||
|
||
@event_parser(model=MyALambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope) | ||
ran-isenberg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def handle_lambda_kafka_with_envelope(event: List[MyALambdaKafkaBusiness], _: LambdaContext): | ||
assert event[0].key == "value" | ||
assert len(event) == 1 | ||
|
||
|
||
@event_parser(model=KafkaEventModel) | ||
def handle_kafka_event(event: KafkaEventModel, _: LambdaContext): | ||
return event | ||
|
||
|
||
def test_kafka_event_with_envelope(): | ||
event = load_event("kafkaEventSelfManaged.json") | ||
handle_lambda_kafka_with_envelope(event, LambdaContext()) | ||
|
||
|
||
def test_self_managed_kafka_event(): | ||
json_event = load_event("kafkaEventSelfManaged.json") | ||
event: KafkaEventModel = handle_kafka_event(json_event, LambdaContext()) | ||
assert event.eventSource == "aws:SelfManagedKafka" | ||
bootstrap_servers = [ | ||
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
] | ||
assert event.bootstrapServers == bootstrap_servers | ||
|
||
records = list(event.records["mytopic-0"]) | ||
assert len(records) == 1 | ||
record: KafkaRecordModel = records[0] | ||
assert record.topic == "mytopic" | ||
assert record.partition == 0 | ||
assert record.offset == 15 | ||
assert record.timestamp is not None | ||
convert_time = int(round(record.timestamp.timestamp() * 1000)) | ||
assert convert_time == 1545084650987 | ||
assert record.timestampType == "CREATE_TIME" | ||
assert record.key == b"recordKey" | ||
assert record.value == '{"key":"value"}' | ||
assert len(record.headers) == 1 | ||
assert record.headers[0]["headerKey"] == b"headerValue" | ||
|
||
|
||
@event_parser(model=MskEventModel) | ||
def handle_msk_event(event: MskEventModel, _: LambdaContext): | ||
return event | ||
|
||
|
||
def test_msk_event(): | ||
ran-isenberg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
json_event = load_event("kafkaEventMsk.json") | ||
event: MskEventModel = handle_msk_event(json_event, LambdaContext()) | ||
assert event.eventSource == "aws:kafka" | ||
bootstrap_servers = [ | ||
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
] | ||
assert event.bootstrapServers == bootstrap_servers | ||
assert ( | ||
event.eventSourceArn | ||
== "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" | ||
) | ||
|
||
records = list(event.records["mytopic-0"]) | ||
assert len(records) == 1 | ||
record: KafkaRecordModel = records[0] | ||
assert record.topic == "mytopic" | ||
assert record.partition == 0 | ||
assert record.offset == 15 | ||
assert record.timestamp is not None | ||
convert_time = int(round(record.timestamp.timestamp() * 1000)) | ||
assert convert_time == 1545084650987 | ||
assert record.timestampType == "CREATE_TIME" | ||
assert record.key == b"recordKey" | ||
assert record.value == '{"key":"value"}' | ||
assert len(record.headers) == 1 | ||
assert record.headers[0]["headerKey"] == b"headerValue" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are adding new features and always improving our code, what do you think about moving these 2 functions to the shared functions file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, np