1
1
import json
2
2
from datetime import datetime
3
3
from functools import lru_cache
4
- from typing import Dict , List , Optional , Union
4
+ from typing import TYPE_CHECKING , Dict , List , Optional , Union
5
5
6
- from mypy_boto3_cloudwatch import type_defs
7
- from mypy_boto3_cloudwatch .client import CloudWatchClient
8
- from mypy_boto3_lambda .client import LambdaClient
9
- from mypy_boto3_xray .client import XRayClient
6
+ if TYPE_CHECKING :
7
+ from mypy_boto3_cloudwatch import MetricDataResultTypeDef
8
+ from mypy_boto3_cloudwatch .client import CloudWatchClient
9
+ from mypy_boto3_lambda .client import LambdaClient
10
+ from mypy_boto3_xray .client import XRayClient
11
+
12
+ import boto3
10
13
from pydantic import BaseModel
11
14
from retry import retry
12
15
@@ -33,14 +36,14 @@ class TraceSegment(BaseModel):
33
36
annotations : Dict = {}
34
37
35
38
36
- def trigger_lambda (lambda_arn : str , client : LambdaClient ):
37
- response = client . invoke ( FunctionName = lambda_arn , InvocationType = "RequestResponse " )
38
- return response
39
+ def trigger_lambda (lambda_arn : str , payload : str , client : Optional [ " LambdaClient" ] = None ):
40
+ client = client or boto3 . client ( "lambda " )
41
+ return client . invoke ( FunctionName = lambda_arn , InvocationType = "RequestResponse" , Payload = payload )
39
42
40
43
41
44
@lru_cache (maxsize = 10 , typed = False )
42
45
@retry (ValueError , delay = 1 , jitter = 1 , tries = 20 )
43
- def get_logs (lambda_function_name : str , log_client : CloudWatchClient , start_time : int , ** kwargs : dict ) -> List [Log ]:
46
+ def get_logs (lambda_function_name : str , log_client : " CloudWatchClient" , start_time : int , ** kwargs : dict ) -> List [Log ]:
44
47
response = log_client .filter_log_events (logGroupName = f"/aws/lambda/{ lambda_function_name } " , startTime = start_time )
45
48
if not response ["events" ]:
46
49
raise ValueError ("Empty response from Cloudwatch Logs. Repeating..." )
@@ -56,27 +59,27 @@ def get_logs(lambda_function_name: str, log_client: CloudWatchClient, start_time
56
59
57
60
58
61
@lru_cache (maxsize = 10 , typed = False )
59
- @retry (ValueError , delay = 1 , jitter = 1 , tries = 20 )
62
+ @retry (ValueError , delay = 1 , jitter = 1 , tries = 5 )
60
63
def get_metrics (
61
64
namespace : str ,
62
- cw_client : CloudWatchClient ,
63
65
start_date : datetime ,
64
66
metric_name : str ,
65
67
service_name : str ,
68
+ cw_client : Optional ["CloudWatchClient" ] = None ,
66
69
end_date : Optional [datetime ] = None ,
67
- ) -> type_defs .MetricDataResultTypeDef :
68
- response = cw_client .get_metric_data (
69
- MetricDataQueries = [
70
- {
71
- "Id" : "m1" ,
72
- "Expression" : f'SELECT MAX("{ metric_name } ") from SCHEMA("{ namespace } ",service) \
70
+ ) -> "MetricDataResultTypeDef" :
71
+ cw_client = cw_client or boto3 .client ("cloudwatch" )
72
+ metric_query = {
73
+ "Id" : "m1" ,
74
+ "Expression" : f'SELECT MAX("{ metric_name } ") from SCHEMA("{ namespace } ",service) \
73
75
where service=\' { service_name } \' ' ,
74
- "ReturnData" : True ,
75
- "Period" : 600 ,
76
- },
77
- ],
76
+ "ReturnData" : True ,
77
+ "Period" : 600 ,
78
+ }
79
+ response = cw_client .get_metric_data (
80
+ MetricDataQueries = [metric_query ],
78
81
StartTime = start_date ,
79
- EndTime = end_date if end_date else datetime .utcnow (),
82
+ EndTime = end_date or datetime .utcnow (),
80
83
)
81
84
result = response ["MetricDataResults" ][0 ]
82
85
if not result ["Values" ]:
@@ -85,7 +88,7 @@ def get_metrics(
85
88
86
89
87
90
@retry (ValueError , delay = 1 , jitter = 1 , tries = 10 )
88
- def get_traces (filter_expression : str , xray_client : XRayClient , start_date : datetime , end_date : datetime ) -> Dict :
91
+ def get_traces (filter_expression : str , xray_client : " XRayClient" , start_date : datetime , end_date : datetime ) -> Dict :
89
92
paginator = xray_client .get_paginator ("get_trace_summaries" )
90
93
response_iterator = paginator .paginate (
91
94
StartTime = start_date ,
0 commit comments