-
Notifications
You must be signed in to change notification settings - Fork 432
feat(parser): add support to VpcLatticeModel #2584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rubenfonseca
merged 5 commits into
aws-powertools:develop
from
leandrodamascena:parser/vpclattice
Jun 27, 2023
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
7650c27
feature: adding vpc lattice parser and envelope
leandrodamascena 9a7ed41
fix: renamed VPC into Vpc
rubenfonseca f1c2abd
Empty-Commit
rubenfonseca 71d9b59
fix: changing returns
leandrodamascena 430b707
fix: removing debug
leandrodamascena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
aws_lambda_powertools/utilities/parser/envelopes/vpc_lattice.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import logging | ||
from typing import Any, Dict, Optional, Type, Union | ||
|
||
from ..models import VpcLatticeModel | ||
from ..types import Model | ||
from .base import BaseEnvelope | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class VpcLatticeEnvelope(BaseEnvelope): | ||
"""Amazon VPC Lattice envelope to extract data within body key""" | ||
|
||
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: | ||
"""Parses data found with model provided | ||
|
||
Parameters | ||
---------- | ||
data : Dict | ||
Lambda event to be parsed | ||
model : Type[Model] | ||
Data model provided to parse after extracting data using envelope | ||
|
||
Returns | ||
------- | ||
Any | ||
Parsed detail payload with model provided | ||
""" | ||
logger.debug(f"Parsing incoming data with VPC Lattice model {VpcLatticeModel}") | ||
parsed_envelope: VpcLatticeModel = VpcLatticeModel.parse_obj(data) | ||
print(parsed_envelope.body) | ||
leandrodamascena marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.debug(f"Parsing event payload in `detail` with {model}") | ||
return self._parse(data=parsed_envelope.body, model=model) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
12 changes: 12 additions & 0 deletions
12
aws_lambda_powertools/utilities/parser/models/vpc_lattice.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from typing import Dict, Type, Union | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class VpcLatticeModel(BaseModel): | ||
method: str | ||
raw_path: str | ||
body: Union[str, Type[BaseModel]] | ||
is_base64_encoded: bool | ||
headers: Dict[str, str] | ||
query_string_parameters: Dict[str, str] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import pytest | ||
|
||
from aws_lambda_powertools.utilities.parser import ( | ||
ValidationError, | ||
envelopes, | ||
event_parser, | ||
) | ||
from aws_lambda_powertools.utilities.parser.models import VpcLatticeModel | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
from tests.functional.parser.schemas import MyVpcLatticeBusiness | ||
from tests.functional.utils import load_event | ||
|
||
|
||
@event_parser(model=MyVpcLatticeBusiness, envelope=envelopes.VpcLatticeEnvelope) | ||
def handle_lambda_vpclattice_with_envelope(event: MyVpcLatticeBusiness, context: LambdaContext): | ||
assert event.username == "Leandro" | ||
assert event.name == "Damascena" | ||
|
||
|
||
def test_vpc_lattice_event_with_envelope(): | ||
event = load_event("vpcLatticeEvent.json") | ||
event["body"] = '{"username": "Leandro", "name": "Damascena"}' | ||
handle_lambda_vpclattice_with_envelope(event, LambdaContext()) | ||
|
||
|
||
def test_vpc_lattice_event(): | ||
raw_event = load_event("vpcLatticeEvent.json") | ||
model = VpcLatticeModel(**raw_event) | ||
|
||
assert model.body == raw_event["body"] | ||
assert model.method == raw_event["method"] | ||
assert model.raw_path == raw_event["raw_path"] | ||
assert model.is_base64_encoded == raw_event["is_base64_encoded"] | ||
assert model.headers == raw_event["headers"] | ||
assert model.query_string_parameters == raw_event["query_string_parameters"] | ||
|
||
|
||
def test_vpc_lattice_event_custom_model(): | ||
class MyCustomResource(VpcLatticeModel): | ||
body: str | ||
|
||
raw_event = load_event("vpcLatticeEvent.json") | ||
model = MyCustomResource(**raw_event) | ||
|
||
assert model.body == raw_event["body"] | ||
|
||
|
||
def test_vpc_lattice_event_invalid(): | ||
raw_event = load_event("vpcLatticeEvent.json") | ||
raw_event["body"] = ["some_data"] | ||
|
||
with pytest.raises(ValidationError): | ||
VpcLatticeModel(**raw_event) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.