Skip to content

Commit 3811356

Browse files
author
Michael Brewer
committed
feat(trigger): Kinesis stream event
Add support for Kinesis stream events with a helper method to decode data
1 parent cc180d3 commit 3811356

File tree

5 files changed

+171
-0
lines changed

5 files changed

+171
-0
lines changed

aws_lambda_powertools/utilities/trigger/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from .cloud_watch_logs_event import CloudWatchLogsEvent
33
from .dynamo_db_stream_event import DynamoDBStreamEvent
44
from .event_bridge_event import EventBridgeEvent
5+
from .kinesis_stream_event import KinesisStreamEvent
56
from .s3_event import S3Event
67
from .ses_event import SESEvent
78
from .sns_event import SNSEvent
@@ -13,6 +14,7 @@
1314
"CloudWatchLogsEvent",
1415
"DynamoDBStreamEvent",
1516
"EventBridgeEvent",
17+
"KinesisStreamEvent",
1618
"S3Event",
1719
"SESEvent",
1820
"SNSEvent",
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import base64
2+
import json
3+
from typing import Iterator
4+
5+
from aws_lambda_powertools.utilities.trigger.common import DictWrapper
6+
7+
8+
class KinesisStreamRecordPayload(DictWrapper):
9+
@property
10+
def approximate_arrival_timestamp(self) -> float:
11+
"""The approximate time that the record was inserted into the stream"""
12+
return float(self["kinesis"]["approximateArrivalTimestamp"])
13+
14+
@property
15+
def data(self) -> str:
16+
"""The data blob"""
17+
return self["kinesis"]["data"]
18+
19+
@property
20+
def kinesis_schema_version(self) -> str:
21+
"""Schema version for the record"""
22+
return self["kinesis"]["kinesisSchemaVersion"]
23+
24+
@property
25+
def partition_key(self) -> str:
26+
"""Identifies which shard in the stream the data record is assigned to"""
27+
return self["kinesis"]["partitionKey"]
28+
29+
@property
30+
def sequence_number(self) -> str:
31+
"""The unique identifier of the record within its shard"""
32+
return self["kinesis"]["sequenceNumber"]
33+
34+
def data_as_text(self) -> str:
35+
"""Decode binary encoded data as text"""
36+
return base64.b64decode(self.data).decode("utf-8")
37+
38+
def data_as_json(self) -> dict:
39+
"""Decode binary encoded data as json"""
40+
return json.loads(self.data_as_text())
41+
42+
43+
class KinesisStreamRecord(DictWrapper):
44+
@property
45+
def aws_region(self) -> str:
46+
"""AWS region where the event originated eg: us-east-1"""
47+
return self["awsRegion"]
48+
49+
@property
50+
def event_id(self) -> str:
51+
"""A globally unique identifier for the event that was recorded in this stream record."""
52+
return self["eventID"]
53+
54+
@property
55+
def event_name(self) -> str:
56+
"""Event type eg: aws:kinesis:record"""
57+
return self["eventName"]
58+
59+
@property
60+
def event_source(self) -> str:
61+
"""The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
62+
return self["eventSource"]
63+
64+
@property
65+
def event_source_arn(self) -> str:
66+
"""The Amazon Resource Name (ARN) of the event source"""
67+
return self["eventSourceARN"]
68+
69+
@property
70+
def event_version(self) -> str:
71+
"""The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
72+
return self["eventVersion"]
73+
74+
@property
75+
def invoke_identity_arn(self) -> str:
76+
"""The ARN for the identity used to invoke the Lambda Function"""
77+
return self["invokeIdentityArn"]
78+
79+
@property
80+
def kinesis(self) -> KinesisStreamRecordPayload:
81+
"""Underlying Kinesis record associated with the event"""
82+
return KinesisStreamRecordPayload(self._data)
83+
84+
85+
class KinesisStreamEvent(dict):
86+
"""Kinesis stream event
87+
88+
Documentation:
89+
--------------
90+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
91+
"""
92+
93+
@property
94+
def records(self) -> Iterator[KinesisStreamRecord]:
95+
for record in self["Records"]:
96+
yield KinesisStreamRecord(record)

aws_lambda_powertools/utilities/trigger/ses_event.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def event_source(self) -> str:
184184

185185
@property
186186
def event_version(self) -> str:
187+
"""The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
187188
return self["eventVersion"]
188189

189190
@property

tests/events/kinesisStreamEvent.json

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "1",
7+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
8+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
9+
"approximateArrivalTimestamp": 1545084650.987
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
16+
"awsRegion": "us-east-2",
17+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
18+
},
19+
{
20+
"kinesis": {
21+
"kinesisSchemaVersion": "1.0",
22+
"partitionKey": "1",
23+
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
24+
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
25+
"approximateArrivalTimestamp": 1545084711.166
26+
},
27+
"eventSource": "aws:kinesis",
28+
"eventVersion": "1.0",
29+
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
30+
"eventName": "aws:kinesis:record",
31+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
32+
"awsRegion": "us-east-2",
33+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
34+
}
35+
]
36+
}

tests/functional/test_lambda_trigger_events.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import base64
12
import json
23
import os
34
from secrets import compare_digest
@@ -7,6 +8,7 @@
78
APIGatewayProxyEventV2,
89
CloudWatchLogsEvent,
910
EventBridgeEvent,
11+
KinesisStreamEvent,
1012
S3Event,
1113
SESEvent,
1214
SNSEvent,
@@ -603,3 +605,37 @@ def test_api_gateway_proxy_v2_get_header_value():
603605

604606
value = event.get_header_value("unknown")
605607
assert value is None
608+
609+
610+
def test_kinesis_stream_event():
611+
event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))
612+
613+
records = list(event.records)
614+
assert len(records) == 2
615+
record = records[0]
616+
617+
assert record.aws_region == "us-east-2"
618+
assert record.event_id == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"
619+
assert record.event_name == "aws:kinesis:record"
620+
assert record.event_source == "aws:kinesis"
621+
assert record.event_source_arn == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
622+
assert record.event_version == "1.0"
623+
assert record.invoke_identity_arn == "arn:aws:iam::123456789012:role/lambda-role"
624+
625+
kinesis = record.kinesis
626+
assert kinesis._data["kinesis"] == event["Records"][0]["kinesis"]
627+
628+
assert kinesis.approximate_arrival_timestamp == 1545084650.987
629+
assert kinesis.data == event["Records"][0]["kinesis"]["data"]
630+
assert kinesis.kinesis_schema_version == "1.0"
631+
assert kinesis.partition_key == "1"
632+
assert kinesis.sequence_number == "49590338271490256608559692538361571095921575989136588898"
633+
634+
assert kinesis.data_as_text() == "Hello, this is a test."
635+
636+
637+
def test_kinesis_stream_event_json_data():
638+
json_value = {"test": "value"}
639+
data = base64.b64encode(bytes(json.dumps(json_value), "utf-8")).decode("utf-8")
640+
event = KinesisStreamEvent({"Records": [{"kinesis": {"data": data}}]})
641+
assert next(event.records).kinesis.data_as_json() == json_value

0 commit comments

Comments
 (0)