-
Notifications
You must be signed in to change notification settings - Fork 429
feat(data-classes): add KafkaEvent and KafkaEventRecord #1485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
leandrodamascena
merged 13 commits into
aws-powertools:develop
from
lyoung-confluent:patch-2
Sep 5, 2022
Merged
Changes from 7 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
91623ee
Add KafkaEvent and KafkaEventRecord data class
lyoung-confluent 2f83089
Add KafkaEvent to __init__.py
lyoung-confluent c3d1724
Mirror the pattern from kinesis/rabbitmq
lyoung-confluent 624d652
Add docs
lyoung-confluent c5cf3d7
Add tests, fix implementation bugs
lyoung-confluent ed34c98
Fix typing on bootstrap_servers result
lyoung-confluent 4c8b957
Fix typing, mypy runs locally now
lyoung-confluent 3d9dd70
address PR feedback
lyoung-confluent 0019737
change kafkaEvent from self-managed to MSK
lyoung-confluent a864e25
adjust tests to use MSK event
lyoung-confluent 041b454
split bootstrap_servers into decoded_bootstrap_servers
lyoung-confluent f00d2c6
Update tests to use decode_bootstrap_servers
lyoung-confluent bd9c8a6
feat(data-classes): adding more tests to kafka events
leandrodamascena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
aws_lambda_powertools/utilities/data_classes/kafka_event.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import base64 | ||
import json | ||
from typing import Any, Dict, Iterator, List, Optional | ||
|
||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
|
||
class KafkaEventRecord(DictWrapper): | ||
@property | ||
def topic(self) -> str: | ||
"""The Kafka topic.""" | ||
return self["topic"] | ||
|
||
@property | ||
def partition(self) -> str: | ||
"""The Kafka record parition.""" | ||
return self["partition"] | ||
|
||
@property | ||
def offset(self) -> str: | ||
"""The Kafka record offset.""" | ||
return self["offset"] | ||
|
||
@property | ||
def timestamp(self) -> int: | ||
"""The Kafka record timestamp.""" | ||
return self["timestamp"] | ||
|
||
@property | ||
def timestamp_type(self) -> str: | ||
"""The Kafka record timestamp type.""" | ||
return self["timestampType"] | ||
|
||
@property | ||
def key(self) -> str: | ||
"""The raw (base64 encoded) Kafka record key.""" | ||
return self["key"] | ||
|
||
@property | ||
def decoded_key(self) -> bytes: | ||
"""Decode the base64 encoded key as bytes.""" | ||
return base64.b64decode(self.key) | ||
|
||
@property | ||
def value(self) -> str: | ||
"""The raw (base64 encoded) Kafka record value.""" | ||
return self["value"] | ||
|
||
@property | ||
def decoded_value(self) -> bytes: | ||
"""Decodes the base64 encoded value as bytes.""" | ||
return base64.b64decode(self.value) | ||
|
||
@property | ||
def json_value(self) -> Any: | ||
"""Decodes the text encoded data as JSON.""" | ||
if self._json_data is None: | ||
self._json_data = json.loads(self.decoded_value.decode("utf-8")) | ||
return self._json_data | ||
|
||
@property | ||
def headers(self) -> List[Dict[str, List[int]]]: | ||
"""The raw Kafka record headers.""" | ||
return self["headers"] | ||
|
||
@property | ||
def decoded_headers(self) -> Dict[str, bytes]: | ||
"""Decodes the headers as a single dictionary.""" | ||
return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()} | ||
|
||
def get_header_value( | ||
self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True | ||
) -> Optional[bytes]: | ||
"""Get a decoded header value by name.""" | ||
if case_sensitive: | ||
return self.decoded_headers.get(name, default_value) | ||
name_lower = name.lower() | ||
|
||
return next( | ||
# Iterate over the dict and do a case-insensitive key comparison | ||
(value for key, value in self.decoded_headers.items() if key.lower() == name_lower), | ||
# Default value is returned if no matches was found | ||
default_value, | ||
) | ||
|
||
|
||
class KafkaEvent(DictWrapper): | ||
"""Self-managed Apache Kafka event trigger | ||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html | ||
""" | ||
|
||
@property | ||
def event_source(self) -> str: | ||
"""The AWS service from which the Kafka event record originated.""" | ||
return self["eventSource"] | ||
|
||
@property | ||
def event_source_arn(self) -> str: | ||
"""The AWS service ARN from which the Kafka event record originated.""" | ||
return self["eventSourceArn"] | ||
|
||
@property | ||
def bootstrap_servers(self) -> Optional[List[str]]: | ||
lyoung-confluent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""The Kafka bootstrap URL.""" | ||
servers = self.get("bootstrapServers") | ||
if not servers: | ||
return None | ||
return servers.split(",") | ||
|
||
@property | ||
def records(self) -> Iterator[KafkaEventRecord]: | ||
"""The Kafka records.""" | ||
for chunk in self["records"].values(): | ||
for record in chunk: | ||
yield KafkaEventRecord(record) | ||
|
||
@property | ||
def record(self) -> KafkaEventRecord: | ||
"""The next Kafka record.""" | ||
return next(self.records) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ | ||
"eventSource":"aws:SelfManagedKafka", | ||
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", | ||
"records":{ | ||
"mytopic-0":[ | ||
{ | ||
"topic":"mytopic", | ||
"partition":0, | ||
"offset":15, | ||
"timestamp":1545084650987, | ||
"timestampType":"CREATE_TIME", | ||
"key":"cmVjb3JkS2V5", | ||
"value":"eyJrZXkiOiJ2YWx1ZSJ9", | ||
"headers":[ | ||
{ | ||
"headerKey":[ | ||
104, | ||
101, | ||
97, | ||
100, | ||
101, | ||
114, | ||
86, | ||
97, | ||
108, | ||
117, | ||
101 | ||
] | ||
} | ||
] | ||
} | ||
] | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.