Skip to content

Commit ec35c7c

Browse files
ryandeivertrubenfonseca
authored andcommitted
add KinesisFirehoseEvent data class
1 parent a8b9f17 commit ec35c7c

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

aws_lambda_powertools/utilities/data_classes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .event_bridge_event import EventBridgeEvent
1414
from .event_source import event_source
1515
from .kafka_event import KafkaEvent
16+
from .kinesis_firehose_event import KinesisFirehoseEvent
1617
from .kinesis_stream_event import KinesisStreamEvent
1718
from .lambda_function_url_event import LambdaFunctionUrlEvent
1819
from .s3_event import S3Event
@@ -32,6 +33,7 @@
3233
"DynamoDBStreamEvent",
3334
"EventBridgeEvent",
3435
"KafkaEvent",
36+
"KinesisFirehoseEvent",
3537
"KinesisStreamEvent",
3638
"LambdaFunctionUrlEvent",
3739
"S3Event",
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import base64
2+
import json
3+
from typing import Iterator
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
7+
8+
class KinesisFirehoseRecordMetadata(DictWrapper):
9+
@property
10+
def shard_id(self) -> str:
11+
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
12+
return self.get("shardId")
13+
14+
@property
15+
def partition_key(self) -> str:
16+
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
17+
return self.get("partitionKey")
18+
19+
@property
20+
def approximate_arrival_timestamp(self) -> str:
21+
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
22+
return self.get("approximateArrivalTimestamp")
23+
24+
@property
25+
def sequence_number(self) -> str:
26+
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
27+
return self.get("sequenceNumber")
28+
29+
@property
30+
def subsequence_number(self) -> str:
31+
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
32+
33+
Note: this will only be present for Kinesis streams using record aggregation
34+
"""
35+
return self.get("subsequenceNumber")
36+
37+
38+
class KinesisFirehoseRecord(DictWrapper):
39+
@property
40+
def approximate_arrival_timestamp(self) -> float:
41+
"""The approximate time that the record was inserted into the delivery stream"""
42+
return float(self["approximateArrivalTimestamp"])
43+
44+
@property
45+
def record_id(self) -> str:
46+
"""Record ID; uniquely identifies this record within the current batch"""
47+
return self["recordId"]
48+
49+
@property
50+
def data(self) -> str:
51+
"""The data blob, base64-encoded"""
52+
return self["data"]
53+
54+
@property
55+
def metadata(self) -> KinesisFirehoseRecordMetadata:
56+
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
57+
return KinesisFirehoseRecordMetadata(self.get('kinesisRecordMetadata', {}))
58+
59+
@property
60+
def data_as_bytes(self) -> bytes:
61+
"""Decoded base64-encoded data as bytes"""
62+
return base64.b64decode(self.data)
63+
64+
@property
65+
def data_as_text(self) -> str:
66+
"""Decoded base64-encoded data as text"""
67+
return self.data_as_bytes.decode("utf-8")
68+
69+
@property
70+
def data_as_json(self) -> dict:
71+
"""Decoded base64-encoded data loaded to json"""
72+
if self._json_data is None:
73+
self._json_data = json.loads(self.data_as_text)
74+
return self._json_data
75+
76+
77+
class KinesisFirehoseEvent(DictWrapper):
78+
"""Kinesis Data Firehose event
79+
80+
Documentation:
81+
--------------
82+
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
83+
"""
84+
85+
@property
86+
def invocation_id(self) -> str:
87+
"""Unique ID for for Lambda invocation"""
88+
return self["invocationId"]
89+
90+
@property
91+
def delivery_stream_arn(self) -> str:
92+
"""ARN of the Firehose Data Firehose Delivery Stream"""
93+
return self["deliveryStreamArn"]
94+
95+
@property
96+
def source_kinesis_stream_arn(self) -> str:
97+
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
98+
return self.get("sourceKinesisStreamArn")
99+
100+
@property
101+
def region(self) -> str:
102+
"""AWS region where the event originated eg: us-east-1"""
103+
return self["region"]
104+
105+
@property
106+
def records(self) -> Iterator[KinesisFirehoseRecord]:
107+
for record in self["records"]:
108+
yield KinesisFirehoseRecord(record)

0 commit comments

Comments
 (0)