1
1
import json
2
+ import secrets
2
3
from datetime import datetime
3
4
from functools import lru_cache
4
- from typing import TYPE_CHECKING , Dict , List , Optional , Union
5
-
6
- if TYPE_CHECKING :
7
- from mypy_boto3_cloudwatch import MetricDataResultTypeDef
8
- from mypy_boto3_cloudwatch .client import CloudWatchClient
9
- from mypy_boto3_lambda .client import LambdaClient
10
- from mypy_boto3_xray .client import XRayClient
5
+ from typing import Dict , List , Optional , Tuple , Union
11
6
12
7
import boto3
8
+ from mypy_boto3_cloudwatch .client import CloudWatchClient
9
+ from mypy_boto3_cloudwatch .type_defs import DimensionTypeDef , MetricDataQueryTypeDef , MetricDataResultTypeDef
10
+ from mypy_boto3_lambda .client import LambdaClient
11
+ from mypy_boto3_lambda .type_defs import InvocationResponseTypeDef
12
+ from mypy_boto3_xray .client import XRayClient
13
13
from pydantic import BaseModel
14
14
from retry import retry
15
15
@@ -36,14 +36,17 @@ class TraceSegment(BaseModel):
36
36
annotations : Dict = {}
37
37
38
38
39
- def trigger_lambda (lambda_arn : str , payload : str , client : Optional ["LambdaClient" ] = None ):
39
+ def trigger_lambda (
40
+ lambda_arn : str , payload : str , client : Optional [LambdaClient ] = None
41
+ ) -> Tuple [InvocationResponseTypeDef , datetime ]:
40
42
client = client or boto3 .client ("lambda" )
41
- return client .invoke (FunctionName = lambda_arn , InvocationType = "RequestResponse" , Payload = payload )
43
+ execution_time = datetime .utcnow ()
44
+ return client .invoke (FunctionName = lambda_arn , InvocationType = "RequestResponse" , Payload = payload ), execution_time
42
45
43
46
44
47
@lru_cache (maxsize = 10 , typed = False )
45
48
@retry (ValueError , delay = 1 , jitter = 1 , tries = 20 )
46
- def get_logs (lambda_function_name : str , log_client : " CloudWatchClient" , start_time : int , ** kwargs : dict ) -> List [Log ]:
49
+ def get_logs (lambda_function_name : str , log_client : CloudWatchClient , start_time : int , ** kwargs : dict ) -> List [Log ]:
47
50
response = log_client .filter_log_events (logGroupName = f"/aws/lambda/{ lambda_function_name } " , startTime = start_time )
48
51
if not response ["events" ]:
49
52
raise ValueError ("Empty response from Cloudwatch Logs. Repeating..." )
@@ -58,26 +61,24 @@ def get_logs(lambda_function_name: str, log_client: "CloudWatchClient", start_ti
58
61
return filtered_logs
59
62
60
63
61
- @lru_cache (maxsize = 10 , typed = False )
62
- @retry (ValueError , delay = 1 , jitter = 1 , tries = 5 )
64
+ @retry (ValueError , delay = 1 , jitter = 1 , tries = 10 )
63
65
def get_metrics (
64
66
namespace : str ,
65
67
start_date : datetime ,
66
68
metric_name : str ,
67
69
service_name : str ,
68
- cw_client : Optional [" CloudWatchClient" ] = None ,
70
+ cw_client : Optional [CloudWatchClient ] = None ,
69
71
end_date : Optional [datetime ] = None ,
70
- ) -> "MetricDataResultTypeDef" :
72
+ period : int = 60 ,
73
+ stat : str = "Sum" ,
74
+ ) -> MetricDataResultTypeDef :
71
75
cw_client = cw_client or boto3 .client ("cloudwatch" )
72
- metric_query = {
73
- "Id" : "m1" ,
74
- "Expression" : f'SELECT MAX("{ metric_name } ") from SCHEMA("{ namespace } ",service) \
75
- where service=\' { service_name } \' ' ,
76
- "ReturnData" : True ,
77
- "Period" : 600 ,
78
- }
76
+ metric_query = build_metric_query_data (
77
+ namespace = namespace , metric_name = metric_name , service_name = service_name , period = period , stat = stat
78
+ )
79
+
79
80
response = cw_client .get_metric_data (
80
- MetricDataQueries = [ metric_query ] ,
81
+ MetricDataQueries = metric_query ,
81
82
StartTime = start_date ,
82
83
EndTime = end_date or datetime .utcnow (),
83
84
)
@@ -88,7 +89,10 @@ def get_metrics(
88
89
89
90
90
91
@retry (ValueError , delay = 1 , jitter = 1 , tries = 10 )
91
- def get_traces (filter_expression : str , xray_client : "XRayClient" , start_date : datetime , end_date : datetime ) -> Dict :
92
+ def get_traces (
93
+ filter_expression : str , start_date : datetime , end_date : datetime , xray_client : Optional [XRayClient ] = None
94
+ ) -> Dict :
95
+ xray_client = xray_client or boto3 .client ("xray" )
92
96
paginator = xray_client .get_paginator ("get_trace_summaries" )
93
97
response_iterator = paginator .paginate (
94
98
StartTime = start_date ,
@@ -132,3 +136,41 @@ def find_meta(segment: dict, result: List):
132
136
)
133
137
if x_subsegment .get ("subsegments" ):
134
138
find_meta (segment = x_subsegment , result = result )
139
+
140
+
141
+ # Maintenance: Build a separate module for builders
142
+ def build_metric_name () -> str :
143
+ return f"test_metric{ build_random_value ()} "
144
+
145
+
146
+ def build_service_name () -> str :
147
+ return f"test_service{ build_random_value ()} "
148
+
149
+
150
+ def build_random_value (nbytes : int = 10 ) -> str :
151
+ return secrets .token_urlsafe (nbytes ).replace ("-" , "" )
152
+
153
+
154
+ def build_metric_query_data (
155
+ namespace : str ,
156
+ metric_name : str ,
157
+ service_name : str ,
158
+ period : int = 60 ,
159
+ stat : str = "Sum" ,
160
+ dimensions : Optional [DimensionTypeDef ] = None ,
161
+ ) -> List [MetricDataQueryTypeDef ]:
162
+ metric_dimensions : List [DimensionTypeDef ] = [{"Name" : "service" , "Value" : service_name }]
163
+ if dimensions is not None :
164
+ metric_dimensions .append (dimensions )
165
+
166
+ return [
167
+ {
168
+ "Id" : metric_name ,
169
+ "MetricStat" : {
170
+ "Metric" : {"Namespace" : namespace , "MetricName" : metric_name , "Dimensions" : metric_dimensions },
171
+ "Period" : period ,
172
+ "Stat" : stat ,
173
+ },
174
+ "ReturnData" : True ,
175
+ }
176
+ ]
0 commit comments