Skip to content

refactor(e2e): support fail fast in get_lambda_response #2912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions tests/e2e/idempotency/test_idempotency_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

from tests.e2e.utils import data_fetcher
from tests.e2e.utils.functions import execute_lambdas_in_parallel
from tests.e2e.utils.data_fetcher.common import GetLambdaResponseOptions, get_lambda_response_in_parallel


@pytest.fixture
Expand Down Expand Up @@ -73,14 +73,21 @@ def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn:
@pytest.mark.xdist_group(name="idempotency")
def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
# GIVEN
payload_timeout_execution = json.dumps({"sleep": 5, "message": "Powertools for AWS Lambda (Python) - TTL 1s"})
payload_working_execution = json.dumps({"sleep": 0, "message": "Powertools for AWS Lambda (Python) - TTL 1s"})
payload_timeout_execution = json.dumps(
{"sleep": 5, "message": "Powertools for AWS Lambda (Python) - TTL 1s"},
sort_keys=True,
)
payload_working_execution = json.dumps(
{"sleep": 0, "message": "Powertools for AWS Lambda (Python) - TTL 1s"},
sort_keys=True,
)

# WHEN
# first call should fail due to timeout
execution_with_timeout, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn,
payload=payload_timeout_execution,
raise_on_error=False,
)
execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8")

Expand All @@ -99,12 +106,15 @@ def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
@pytest.mark.xdist_group(name="idempotency")
def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
# GIVEN
arguments = json.dumps({"message": "Powertools for AWS Lambda (Python) - Parallel execution"})
payload = json.dumps({"message": "Powertools for AWS Lambda (Python) - Parallel execution"})

# WHEN
# executing Lambdas in parallel
lambdas_arn = [parallel_execution_handler_fn_arn, parallel_execution_handler_fn_arn]
execution_result_list = execute_lambdas_in_parallel("data_fetcher.get_lambda_response", lambdas_arn, arguments)
invocation_options = [
GetLambdaResponseOptions(lambda_arn=parallel_execution_handler_fn_arn, payload=payload, raise_on_error=False),
GetLambdaResponseOptions(lambda_arn=parallel_execution_handler_fn_arn, payload=payload, raise_on_error=False),
]

# WHEN executing Lambdas in parallel
execution_result_list = get_lambda_response_in_parallel(invocation_options)

timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8")
error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8")
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/utils/data_builder/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def build_trace_default_query(function_name: str) -> str:
return f'service("{function_name}")'
return f'service(id(name: "{function_name}"))'


def build_put_annotations_input(**annotations: str) -> List[Dict]:
Expand Down
94 changes: 91 additions & 3 deletions tests/e2e/utils/data_fetcher/common.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,76 @@
import functools
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from typing import Optional, Tuple
from typing import List, Optional, Tuple

import boto3
import requests
from mypy_boto3_lambda import LambdaClient
from mypy_boto3_lambda.type_defs import InvocationResponseTypeDef
from pydantic import BaseModel
from requests import Request, Response
from requests.exceptions import RequestException
from retry import retry

GetLambdaResponse = Tuple[InvocationResponseTypeDef, datetime]


class GetLambdaResponseOptions(BaseModel):
lambda_arn: str
payload: Optional[str] = None
client: Optional[LambdaClient] = None
raise_on_error: bool = True

# Maintenance: Pydantic v2 deprecated it; we should update in v3
class Config:
arbitrary_types_allowed = True


def get_lambda_response(
lambda_arn: str,
payload: Optional[str] = None,
client: Optional[LambdaClient] = None,
) -> Tuple[InvocationResponseTypeDef, datetime]:
raise_on_error: bool = True,
) -> GetLambdaResponse:
"""Invoke function synchronously

Parameters
----------
lambda_arn : str
Lambda function ARN to invoke
payload : Optional[str], optional
JSON payload for Lambda invocation, by default None
client : Optional[LambdaClient], optional
Boto3 Lambda SDK client, by default None
raise_on_error : bool, optional
Whether to raise exception upon invocation error, by default True

Returns
-------
Tuple[InvocationResponseTypeDef, datetime]
Function response and approximate execution time

Raises
------
RuntimeError
Function invocation error details
"""
client = client or boto3.client("lambda")
payload = payload or ""
execution_time = datetime.utcnow()
return client.invoke(FunctionName=lambda_arn, InvocationType="RequestResponse", Payload=payload), execution_time
response: InvocationResponseTypeDef = client.invoke(
FunctionName=lambda_arn,
InvocationType="RequestResponse",
Payload=payload,
)

has_error = response.get("FunctionError", "") == "Unhandled"
if has_error and raise_on_error:
error_payload = response["Payload"].read().decode()
raise RuntimeError(f"Function failed invocation: {error_payload}")

return response, execution_time


@retry(RequestException, delay=2, jitter=1.5, tries=5)
Expand All @@ -27,3 +79,39 @@ def get_http_response(request: Request) -> Response:
result = session.send(request.prepare())
result.raise_for_status()
return result


def get_lambda_response_in_parallel(
get_lambda_response_options: List[GetLambdaResponseOptions],
) -> List[GetLambdaResponse]:
"""Invoke functions in parallel

Parameters
----------
get_lambda_response_options : List[GetLambdaResponseOptions]
List of options to call get_lambda_response with

Returns
-------
List[GetLambdaResponse]
Function responses and approximate execution time
"""
result_list = []
with ThreadPoolExecutor() as executor:
running_tasks: List[Future] = []
for options in get_lambda_response_options:
# Sleep 0.5, 1, 1.5, ... seconds between each invocation. This way
# we can guarantee that lambdas are executed in parallel, but they are
# called in the same "order" as they are passed in, thus guaranteeing that
# we can assert on the correct output.
time.sleep(0.5 * len(running_tasks))

get_lambda_response_callback = functools.partial(get_lambda_response, **options.dict())
running_tasks.append(
executor.submit(get_lambda_response_callback),
)

executor.shutdown(wait=True)
result_list.extend(running_task.result() for running_task in running_tasks)

return result_list
32 changes: 0 additions & 32 deletions tests/e2e/utils/functions.py

This file was deleted.