-
Notifications
You must be signed in to change notification settings - Fork 429
feat(parser): add KinesisFirehoseModel #1556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
leandrodamascena
merged 7 commits into
aws-powertools:develop
from
ran-isenberg:firehose
Oct 10, 2022
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
229bd6f
Feature request: Kinesis Data Firehose event envelope and parser model
ea74092
Merge branch 'develop' into firehose
ran-isenberg 2512ab9
Merge branch 'develop' into firehose
ran-isenberg f95afa8
Merge branch 'develop' into firehose
ran-isenberg 0b48268
feat(parser): adjusts in kinesis firehose parser
leandrodamascena 114caa1
Merge branch 'develop' into firehose
ran-isenberg ca73ecd
add positive int and fix develop conflict
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
47 changes: 47 additions & 0 deletions
47
aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import logging | ||
from typing import Any, Dict, List, Optional, Type, Union, cast | ||
|
||
from ..models import KinesisFirehoseModel | ||
from ..types import Model | ||
from .base import BaseEnvelope | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KinesisFirehoseEnvelope(BaseEnvelope): | ||
"""Kinesis Firehose Envelope to extract array of Records | ||
|
||
The record's data parameter is a base64 encoded string which is parsed into a bytes array, | ||
though it can also be a JSON encoded string. | ||
Regardless of its type it'll be parsed into a BaseModel object. | ||
|
||
Note: Records will be parsed the same way so if model is str, | ||
all items in the list will be parsed as str and not as JSON (and vice versa) | ||
|
||
https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html | ||
""" | ||
|
||
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: | ||
"""Parses records found with model provided | ||
|
||
Parameters | ||
---------- | ||
data : Dict | ||
Lambda event to be parsed | ||
model : Type[Model] | ||
Data model provided to parse after extracting data using envelope | ||
|
||
Returns | ||
------- | ||
List | ||
List of records parsed with model provided | ||
""" | ||
logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}") | ||
parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data) | ||
logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}") | ||
models = [] | ||
for record in parsed_envelope.records: | ||
# We allow either AWS expected contract (bytes) or a custom Model, see #943 | ||
data = cast(bytes, record.data) | ||
models.append(self._parse(data=data.decode("utf-8"), model=model)) | ||
return models |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
32 changes: 32 additions & 0 deletions
32
aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
from typing import List, Optional, Type, Union | ||
|
||
from pydantic import BaseModel, PositiveInt, validator | ||
|
||
from aws_lambda_powertools.shared.functions import base64_decode | ||
|
||
|
||
class KinesisFirehoseRecordMetadata(BaseModel): | ||
shardId: str | ||
partitionKey: str | ||
approximateArrivalTimestamp: PositiveInt | ||
sequenceNumber: str | ||
subsequenceNumber: str | ||
|
||
|
||
class KinesisFirehoseRecord(BaseModel): | ||
data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes | ||
recordId: str | ||
approximateArrivalTimestamp: PositiveInt | ||
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] | ||
|
||
@validator("data", pre=True, allow_reuse=True) | ||
def data_base64_decode(cls, value): | ||
return base64_decode(value) | ||
|
||
|
||
class KinesisFirehoseModel(BaseModel): | ||
invocationId: str | ||
deliveryStreamArn: str | ||
region: str | ||
sourceKinesisStreamArn: Optional[str] | ||
records: List[KinesisFirehoseRecord] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
{ | ||
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", | ||
"sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source", | ||
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", | ||
"region": "us-east-2", | ||
"records": [ | ||
{ | ||
"data": "SGVsbG8gV29ybGQ=", | ||
"recordId": "record1", | ||
"approximateArrivalTimestamp": 1664028820148, | ||
"kinesisRecordMetadata": { | ||
"shardId": "shardId-000000000000", | ||
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", | ||
"approximateArrivalTimestamp": 1664028820148, | ||
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154", | ||
"subsequenceNumber": "" | ||
} | ||
}, | ||
{ | ||
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9", | ||
"recordId": "record2", | ||
"approximateArrivalTimestamp": 1664028793294, | ||
"kinesisRecordMetadata": { | ||
"shardId": "shardId-000000000001", | ||
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", | ||
"approximateArrivalTimestamp": 1664028793294, | ||
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155", | ||
"subsequenceNumber": "" | ||
} | ||
} | ||
] | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
{ | ||
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", | ||
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", | ||
"region": "us-east-2", | ||
"records":[ | ||
{ | ||
"recordId":"record1", | ||
"approximateArrivalTimestamp":1664029185290, | ||
"data":"SGVsbG8gV29ybGQ=" | ||
}, | ||
{ | ||
"recordId":"record2", | ||
"approximateArrivalTimestamp":1664029186945, | ||
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9" | ||
} | ||
] | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
from typing import List | ||
|
||
import pytest | ||
|
||
from aws_lambda_powertools.utilities.parser import ( | ||
ValidationError, | ||
envelopes, | ||
event_parser, | ||
) | ||
from aws_lambda_powertools.utilities.parser.models import ( | ||
KinesisFirehoseModel, | ||
KinesisFirehoseRecord, | ||
KinesisFirehoseRecordMetadata, | ||
) | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
from tests.functional.parser.schemas import MyKinesisFirehoseBusiness | ||
from tests.functional.utils import load_event | ||
|
||
|
||
@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope) | ||
def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext): | ||
assert len(event) == 1 | ||
assert event[0].Hello == "World" | ||
|
||
|
||
@event_parser(model=KinesisFirehoseModel) | ||
def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext): | ||
assert event.region == "us-east-2" | ||
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" | ||
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" | ||
assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source" | ||
|
||
records = list(event.records) | ||
assert len(records) == 2 | ||
record_01: KinesisFirehoseRecord = records[0] | ||
assert record_01.approximateArrivalTimestamp == 1664028820148 | ||
assert record_01.recordId == "record1" | ||
assert record_01.data == b"Hello World" | ||
|
||
metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata | ||
assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a" | ||
assert metadata_01.subsequenceNumber == "" | ||
assert metadata_01.shardId == "shardId-000000000000" | ||
assert metadata_01.approximateArrivalTimestamp == 1664028820148 | ||
assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154" | ||
|
||
record_02: KinesisFirehoseRecord = records[1] | ||
assert record_02.approximateArrivalTimestamp == 1664028793294 | ||
assert record_02.recordId == "record2" | ||
assert record_02.data == b'{"Hello": "World"}' | ||
|
||
metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata | ||
assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a" | ||
assert metadata_02.subsequenceNumber == "" | ||
assert metadata_02.shardId == "shardId-000000000001" | ||
assert metadata_02.approximateArrivalTimestamp == 1664028793294 | ||
assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155" | ||
|
||
|
||
@event_parser(model=KinesisFirehoseModel) | ||
def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext): | ||
leandrodamascena marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert event.region == "us-east-2" | ||
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" | ||
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" | ||
|
||
records = list(event.records) | ||
assert len(records) == 2 | ||
|
||
record_01: KinesisFirehoseRecord = records[0] | ||
assert record_01.approximateArrivalTimestamp == 1664029185290 | ||
assert record_01.recordId == "record1" | ||
assert record_01.data == b"Hello World" | ||
|
||
record_02: KinesisFirehoseRecord = records[1] | ||
assert record_02.approximateArrivalTimestamp == 1664029186945 | ||
assert record_02.recordId == "record2" | ||
assert record_02.data == b'{"Hello": "World"}' | ||
|
||
|
||
def test_firehose_trigger_event(): | ||
event_dict = load_event("kinesisFirehoseKinesisEvent.json") | ||
event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class | ||
handle_firehose(event_dict, LambdaContext()) | ||
|
||
|
||
def test_firehose_trigger_event_kinesis_no_envelope(): | ||
event_dict = load_event("kinesisFirehoseKinesisEvent.json") | ||
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) | ||
|
||
|
||
def test_firehose_trigger_event_put_no_envelope(): | ||
event_dict = load_event("kinesisFirehosePutEvent.json") | ||
handle_firehose_no_envelope_put(event_dict, LambdaContext()) | ||
|
||
|
||
def test_kinesis_trigger_bad_base64_event(): | ||
event_dict = load_event("kinesisFirehoseKinesisEvent.json") | ||
event_dict["records"][0]["data"] = {"bad base64"} | ||
with pytest.raises(ValidationError): | ||
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) | ||
|
||
|
||
def test_kinesis_trigger_bad_timestamp_event(): | ||
event_dict = load_event("kinesisFirehoseKinesisEvent.json") | ||
event_dict["records"][0]["approximateArrivalTimestamp"] = -1 | ||
with pytest.raises(ValidationError): | ||
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) | ||
|
||
|
||
def test_kinesis_trigger_bad_metadata_timestamp_event(): | ||
event_dict = load_event("kinesisFirehoseKinesisEvent.json") | ||
event_dict["records"][0]["kinesisRecordMetadata"]["approximateArrivalTimestamp"] = "-1" | ||
with pytest.raises(ValidationError): | ||
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.