-
Notifications
You must be signed in to change notification settings - Fork 430
feat(parser): add KafkaMskEventModel and KafkaSelfManagedEventModel #1499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
fef70b0
docs(core): match code snippet name with filename (#1286)
heitorlessa d918387
fix(ci): accept core arg in label related issue workflow
heitorlessa 13558d9
chore(ci): update changelog with latest changes
40ce509
docs(apigateway): removes duplicate admonition (#1426)
peterschutt 5951ab0
chore(ci): update changelog with latest changes
6b23903
docs(parser): minor grammar fix (#1427)
peterschutt 6380b63
chore(ci): update changelog with latest changes
8e24ae3
chore(ci): update changelog with latest changes
af99733
Merge branch 'develop' of https://github.com/ran-isenberg/aws-lambda-…
ran-isenberg 6c405fa
merge
f2a6c9b
update changelog with latest changes
70de885
feature: Kafka Parser support
574a389
feature: Kafka Parser support
a0f778c
Merge branch 'develop' of https://github.com/awslabs/aws-lambda-power…
c0df797
cr fixes
a918a94
cr fixes
06c94c9
docs fix
61d833c
fixes
4e4282c
fixes
9ee356a
a
afbd394
fixes
139ffd7
Update tests/functional/parser/test_kafka.py
ran-isenberg bfe0803
Update tests/functional/parser/test_kafka.py
ran-isenberg 50571b7
Update tests/functional/parser/test_kafka.py
ran-isenberg 75e84bd
cr fixes
86ef6de
feat(parser): small changes
leandrodamascena f41c82c
feat(parser): changes in envelope
leandrodamascena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import logging | ||
from typing import Any, Dict, List, Optional, Type, Union | ||
|
||
from ..models import KafkaMskEventModel | ||
from ..types import Model | ||
from .base import BaseEnvelope | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KafkaEnvelope(BaseEnvelope): | ||
"""Kafka event envelope to extract data within body key | ||
The record's body parameter is a string, though it can also be a JSON encoded string. | ||
Regardless of its type it'll be parsed into a BaseModel object. | ||
|
||
Note: Records will be parsed the same way so if model is str, | ||
all items in the list will be parsed as str and npt as JSON (and vice versa) | ||
""" | ||
|
||
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: | ||
"""Parses data found with model provided | ||
|
||
Parameters | ||
---------- | ||
data : Dict | ||
Lambda event to be parsed | ||
model : Type[Model] | ||
Data model provided to parse after extracting data using envelope | ||
|
||
Returns | ||
------- | ||
List | ||
List of records parsed with model provided | ||
""" | ||
logger.debug(f"Parsing incoming data with Kafka event model {KafkaMskEventModel}") | ||
parsed_envelope: KafkaMskEventModel = KafkaMskEventModel.parse_obj(data) | ||
logger.debug(f"Parsing Kafka event records in `value` with {model}") | ||
ret_list = [] | ||
for records in parsed_envelope.records.values(): | ||
ret_list += [self._parse(data=record.value, model=model) for record in records] | ||
return ret_list |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
from datetime import datetime | ||
from typing import Dict, List, Type, Union | ||
|
||
from pydantic import BaseModel, validator | ||
|
||
from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string | ||
from aws_lambda_powertools.utilities.parser.types import Literal | ||
|
||
SERVERS_DELIMITER = "," | ||
|
||
|
||
class KafkaRecordModel(BaseModel): | ||
topic: str | ||
partition: int | ||
offset: int | ||
timestamp: datetime | ||
timestampType: str | ||
key: bytes | ||
value: Union[str, Type[BaseModel]] | ||
headers: List[Dict[str, bytes]] | ||
|
||
# validators | ||
_decode_key = validator("key", allow_reuse=True)(base64_decode) | ||
|
||
@validator("value", pre=True, allow_reuse=True) | ||
def data_base64_decode(cls, value): | ||
as_bytes = base64_decode(value) | ||
return bytes_to_string(as_bytes) | ||
|
||
@validator("headers", pre=True, allow_reuse=True) | ||
def decode_headers_list(cls, value): | ||
for header in value: | ||
for key, values in header.items(): | ||
header[key] = bytes(values) | ||
return value | ||
|
||
|
||
class KafkaBaseEventModel(BaseModel): | ||
bootstrapServers: List[str] | ||
records: Dict[str, List[KafkaRecordModel]] | ||
|
||
@validator("bootstrapServers", pre=True, allow_reuse=True) | ||
def split_servers(cls, value): | ||
return None if not value else value.split(SERVERS_DELIMITER) | ||
|
||
|
||
class KafkaSelfManagedEventModel(KafkaBaseEventModel): | ||
"""Self-managed Apache Kafka event trigger | ||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html | ||
""" | ||
|
||
eventSource: Literal["aws:SelfManagedKafka"] | ||
|
||
|
||
class KafkaMskEventModel(KafkaBaseEventModel): | ||
"""Fully-managed AWS Apache Kafka event trigger | ||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html | ||
""" | ||
|
||
eventSource: Literal["aws:kafka"] | ||
eventSourceArn: str |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
from typing import List | ||
|
||
from aws_lambda_powertools.utilities.parser import envelopes, event_parser | ||
from aws_lambda_powertools.utilities.parser.models import ( | ||
KafkaMskEventModel, | ||
KafkaRecordModel, | ||
KafkaSelfManagedEventModel, | ||
) | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
from tests.functional.parser.schemas import MyLambdaKafkaBusiness | ||
from tests.functional.utils import load_event | ||
|
||
|
||
@event_parser(model=MyLambdaKafkaBusiness, envelope=envelopes.KafkaEnvelope) | ||
def handle_lambda_kafka_with_envelope(event: List[MyLambdaKafkaBusiness], _: LambdaContext): | ||
assert event[0].key == "value" | ||
assert len(event) == 1 | ||
|
||
|
||
@event_parser(model=KafkaSelfManagedEventModel) | ||
def handle_kafka_event(event: KafkaSelfManagedEventModel, _: LambdaContext): | ||
return event | ||
|
||
|
||
def test_kafka_event_with_envelope(): | ||
event = load_event("kafkaEventMsk.json") | ||
handle_lambda_kafka_with_envelope(event, LambdaContext()) | ||
|
||
|
||
def test_self_managed_kafka_event(): | ||
json_event = load_event("kafkaEventSelfManaged.json") | ||
event: KafkaSelfManagedEventModel = handle_kafka_event(json_event, LambdaContext()) | ||
assert event.eventSource == "aws:SelfManagedKafka" | ||
bootstrap_servers = [ | ||
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
] | ||
assert event.bootstrapServers == bootstrap_servers | ||
|
||
records = list(event.records["mytopic-0"]) | ||
assert len(records) == 1 | ||
record: KafkaRecordModel = records[0] | ||
assert record.topic == "mytopic" | ||
assert record.partition == 0 | ||
assert record.offset == 15 | ||
assert record.timestamp is not None | ||
convert_time = int(round(record.timestamp.timestamp() * 1000)) | ||
assert convert_time == 1545084650987 | ||
assert record.timestampType == "CREATE_TIME" | ||
assert record.key == b"recordKey" | ||
assert record.value == '{"key":"value"}' | ||
assert len(record.headers) == 1 | ||
assert record.headers[0]["headerKey"] == b"headerValue" | ||
|
||
|
||
@event_parser(model=KafkaMskEventModel) | ||
def handle_msk_event(event: KafkaMskEventModel, _: LambdaContext): | ||
return event | ||
|
||
|
||
def test_kafka_msk_event(): | ||
json_event = load_event("kafkaEventMsk.json") | ||
event: KafkaMskEventModel = handle_msk_event(json_event, LambdaContext()) | ||
assert event.eventSource == "aws:kafka" | ||
bootstrap_servers = [ | ||
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
] | ||
assert event.bootstrapServers == bootstrap_servers | ||
assert ( | ||
event.eventSourceArn | ||
== "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" | ||
) | ||
|
||
records = list(event.records["mytopic-0"]) | ||
assert len(records) == 1 | ||
record: KafkaRecordModel = records[0] | ||
assert record.topic == "mytopic" | ||
assert record.partition == 0 | ||
assert record.offset == 15 | ||
assert record.timestamp is not None | ||
convert_time = int(round(record.timestamp.timestamp() * 1000)) | ||
assert convert_time == 1545084650987 | ||
assert record.timestampType == "CREATE_TIME" | ||
assert record.key == b"recordKey" | ||
assert record.value == '{"key":"value"}' | ||
assert len(record.headers) == 1 | ||
assert record.headers[0]["headerKey"] == b"headerValue" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.