-
Notifications
You must be signed in to change notification settings - Fork 429
feat(data_classes): add KinesisFirehoseEvent #1540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
ec35c7c
ab36af5
4855551
c9b93fc
642734f
ebe6714
2416d8d
e8e6417
237f96d
a9b24dd
cf13bb8
085873a
f04a81b
e10f62a
b95cd97
a3d1410
2ab66fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
import base64 | ||
import json | ||
from typing import Iterator, Optional | ||
|
||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
|
||
class KinesisFirehoseRecordMetadata(DictWrapper): | ||
@property | ||
def _metadata(self) -> dict: | ||
"""Optional: metadata associated with this record; present only when Kinesis Stream is source""" | ||
return self["kinesisRecordMetadata"] # could raise KeyError | ||
|
||
@property | ||
def shard_id(self) -> Optional[str]: | ||
"""Kinesis stream shard ID; present only when Kinesis Stream is source""" | ||
return self._metadata.get("shardId") | ||
|
||
@property | ||
def partition_key(self) -> Optional[str]: | ||
"""Kinesis stream partition key; present only when Kinesis Stream is source""" | ||
return self._metadata.get("partitionKey") | ||
|
||
@property | ||
def approximate_arrival_timestamp(self) -> Optional[str]: | ||
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" | ||
return self._metadata.get("approximateArrivalTimestamp") | ||
|
||
@property | ||
def sequence_number(self) -> Optional[str]: | ||
"""Kinesis stream sequence number; present only when Kinesis Stream is source""" | ||
return self._metadata.get("sequenceNumber") | ||
|
||
@property | ||
def subsequence_number(self) -> Optional[str]: | ||
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source | ||
|
||
Note: this will only be present for Kinesis streams using record aggregation | ||
""" | ||
return self._metadata.get("subsequenceNumber") | ||
ryandeivert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class KinesisFirehoseRecord(DictWrapper): | ||
@property | ||
def approximate_arrival_timestamp(self) -> float: | ||
leandrodamascena marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""The approximate time that the record was inserted into the delivery stream""" | ||
return float(self["approximateArrivalTimestamp"]) | ||
|
||
@property | ||
def record_id(self) -> str: | ||
"""Record ID; uniquely identifies this record within the current batch""" | ||
return self["recordId"] | ||
|
||
@property | ||
def data(self) -> str: | ||
"""The data blob, base64-encoded""" | ||
return self["data"] | ||
|
||
@property | ||
def metadata(self) -> KinesisFirehoseRecordMetadata: | ||
"""Optional: metadata associated with this record; present only when Kinesis Stream is source""" | ||
return KinesisFirehoseRecordMetadata(self._data) | ||
ryandeivert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@property | ||
def data_as_bytes(self) -> bytes: | ||
"""Decoded base64-encoded data as bytes""" | ||
return base64.b64decode(self.data) | ||
|
||
@property | ||
def data_as_text(self) -> str: | ||
"""Decoded base64-encoded data as text""" | ||
return self.data_as_bytes.decode("utf-8") | ||
|
||
@property | ||
def data_as_json(self) -> dict: | ||
"""Decoded base64-encoded data loaded to json""" | ||
if self._json_data is None: | ||
self._json_data = json.loads(self.data_as_text) | ||
return self._json_data | ||
Comment on lines
+75
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to review this piece of code. We'll discuss this in our daily sync on Monday and I back here with updates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EAFP |
||
|
||
|
||
class KinesisFirehoseEvent(DictWrapper): | ||
"""Kinesis Data Firehose event | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html | ||
""" | ||
|
||
@property | ||
def invocation_id(self) -> str: | ||
"""Unique ID for for Lambda invocation""" | ||
return self["invocationId"] | ||
|
||
@property | ||
def delivery_stream_arn(self) -> str: | ||
"""ARN of the Firehose Data Firehose Delivery Stream""" | ||
return self["deliveryStreamArn"] | ||
|
||
@property | ||
def source_kinesis_stream_arn(self) -> Optional[str]: | ||
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source""" | ||
return self.get("sourceKinesisStreamArn") | ||
|
||
@property | ||
def region(self) -> str: | ||
"""AWS region where the event originated eg: us-east-1""" | ||
return self["region"] | ||
|
||
@property | ||
def records(self) -> Iterator[KinesisFirehoseRecord]: | ||
for record in self["records"]: | ||
yield KinesisFirehoseRecord(record) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import base64 | ||
import json | ||
|
||
from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent, event_source | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
@event_source(data_class=KinesisFirehoseEvent) | ||
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): | ||
result = [] | ||
|
||
for record in event.records: | ||
# if data was delivered as json; caches loaded value | ||
data = record.data_as_json | ||
|
||
processed_record = { | ||
"recordId": record.record_id, | ||
"data": base64.b64encode(json.dumps(data).encode("utf-8")), | ||
"result": "Ok", | ||
} | ||
|
||
result.append(processed_record) | ||
|
||
# return transformed records | ||
return {"records": result} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
{ | ||
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", | ||
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", | ||
"region": "us-east-2", | ||
ryandeivert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"records": [ | ||
{ | ||
"data": "SGVsbG8gV29ybGQ=", | ||
"recordId": "record1", | ||
"approximateArrivalTimestamp": 1510772160000, | ||
"kinesisRecordMetadata": { | ||
"shardId": "shardId-000000000000", | ||
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", | ||
"approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z", | ||
ryandeivert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154", | ||
"subsequenceNumber": "" | ||
} | ||
}, | ||
{ | ||
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9", | ||
"recordId": "record2", | ||
"approximateArrivalTimestamp": 151077216000, | ||
"kinesisRecordMetadata": { | ||
"shardId": "shardId-000000000001", | ||
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", | ||
"approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z", | ||
ryandeivert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155", | ||
"subsequenceNumber": "" | ||
} | ||
} | ||
] | ||
} |
Uh oh!
There was an error while loading. Please reload this page.