Skip to content

Commit 2660af4

Browse files
authored
Added support for more than 100 metrics (#31)
* Added support for more than 100 metrics * Add unit tests * Changes reset the body
1 parent b6c4f22 commit 2660af4

File tree

8 files changed

+166
-27
lines changed

8 files changed

+166
-27
lines changed

aws_embedded_metrics/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@
1313

1414
DEFAULT_NAMESPACE = "aws-embedded-metrics"
1515
MAX_DIMENSIONS = 9
16+
MAX_METRICS_PER_EVENT = 100

aws_embedded_metrics/serializers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
import abc
1515
from aws_embedded_metrics.logger.metrics_context import MetricsContext
16+
from typing import List
1617

1718

1819
class Serializer(abc.ABC):
1920
@staticmethod
2021
@abc.abstractmethod
21-
def serialize(context: MetricsContext) -> str:
22+
def serialize(context: MetricsContext) -> List[str]:
2223
"""Flushes the metrics context to the sink."""

aws_embedded_metrics/serializers/log_serializer.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313

1414
from aws_embedded_metrics.logger.metrics_context import MetricsContext
1515
from aws_embedded_metrics.serializers import Serializer
16-
from aws_embedded_metrics.constants import MAX_DIMENSIONS
16+
from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT
1717
import json
1818
from typing import Any, Dict, List
1919

2020

2121
class LogSerializer(Serializer):
2222
@staticmethod
23-
def serialize(context: MetricsContext) -> str:
23+
def serialize(context: MetricsContext) -> List[str]:
2424
dimension_keys = []
2525
dimensions_properties: Dict[str, str] = {}
2626

@@ -29,28 +29,40 @@ def serialize(context: MetricsContext) -> str:
2929
dimension_keys.append(keys[0:MAX_DIMENSIONS])
3030
dimensions_properties = {**dimensions_properties, **dimension_set}
3131

32-
metric_pointers: List[Dict[str, str]] = []
32+
def create_body() -> Dict[str, Any]:
33+
return {
34+
**dimensions_properties,
35+
**context.properties,
36+
"_aws": {
37+
**context.meta,
38+
"CloudWatchMetrics": [
39+
{
40+
"Dimensions": dimension_keys,
41+
"Metrics": [],
42+
"Namespace": context.namespace,
43+
},
44+
],
45+
},
46+
}
3347

34-
metric_definitions = {
35-
"Dimensions": dimension_keys,
36-
"Metrics": metric_pointers,
37-
"Namespace": context.namespace,
38-
}
39-
cloud_watch_metrics = [metric_definitions]
40-
41-
body: Dict[str, Any] = {
42-
**dimensions_properties,
43-
**context.properties,
44-
"_aws": {**context.meta, "CloudWatchMetrics": cloud_watch_metrics},
45-
}
48+
current_body: Dict[str, Any] = create_body()
49+
event_batches: List[str] = []
4650

4751
for metric_name, metric in context.metrics.items():
4852

4953
if len(metric.values) == 1:
50-
body[metric_name] = metric.values[0]
54+
current_body[metric_name] = metric.values[0]
5155
else:
52-
body[metric_name] = metric.values
56+
current_body[metric_name] = metric.values
57+
58+
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
59+
60+
should_serialize: bool = len(current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"]) == MAX_METRICS_PER_EVENT
61+
if should_serialize:
62+
event_batches.append(json.dumps(current_body))
63+
current_body = create_body()
5364

54-
metric_pointers.append({"Name": metric_name, "Unit": metric.unit})
65+
if not event_batches or current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"]:
66+
event_batches.append(json.dumps(current_body))
5567

56-
return json.dumps(body)
68+
return event_batches

aws_embedded_metrics/sinks/agent_sink.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ def accept(self, context: MetricsContext) -> None:
6666
if self.log_steam_name is not None:
6767
context.meta["LogStreamName"] = self.log_steam_name
6868

69-
serialized_content = self.serializer.serialize(context) + '\n'
7069
log.info(
7170
"Parsed agent endpoint (%s) %s:%s",
7271
self.endpoint.scheme,
7372
self.endpoint.hostname,
7473
self.endpoint.port,
7574
)
76-
self.client.send_message(serialized_content.encode('utf-8'))
75+
for serialized_content in self.serializer.serialize(context):
76+
message = serialized_content + "\n"
77+
self.client.send_message(message.encode('utf-8'))
7778

7879
@staticmethod
7980
def name() -> str:

aws_embedded_metrics/sinks/lambda_sink.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def __init__(self, serializer: Serializer = LogSerializer()):
2222
self.serializer = serializer
2323

2424
def accept(self, context: MetricsContext) -> None:
25-
print(self.serializer.serialize(context))
25+
for serialized_content in self.serializer.serialize(context):
26+
print(serialized_content)
2627

2728
@staticmethod
2829
def name() -> str:

tests/serializer/test_log_serializer.py

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def test_serialize_dimensions():
2222
context.put_dimensions(dimensions)
2323

2424
# act
25-
result_json = serializer.serialize(context)
25+
result_json = serializer.serialize(context)[0]
2626

2727
# assert
2828
assert_json_equality(result_json, expected)
@@ -53,7 +53,7 @@ def test_cannot_serialize_more_than_9_dimensions():
5353
context.put_dimensions(dimensions)
5454

5555
# act
56-
result_json = serializer.serialize(context)
56+
result_json = serializer.serialize(context)[0]
5757

5858
# assert
5959
assert_json_equality(result_json, expected)
@@ -71,7 +71,7 @@ def test_serialize_properties():
7171
context.set_property(expected_key, expected_value)
7272

7373
# act
74-
result_json = serializer.serialize(context)
74+
result_json = serializer.serialize(context)[0]
7575

7676
# assert
7777
assert_json_equality(result_json, expected)
@@ -94,12 +94,89 @@ def test_serialize_metrics():
9494
context.put_metric(expected_key, expected_value)
9595

9696
# act
97-
result_json = serializer.serialize(context)
97+
result_json = serializer.serialize(context)[0]
9898

9999
# assert
100100
assert_json_equality(result_json, expected)
101101

102102

103+
def test_serialize_more_than_100_metrics():
104+
# arrange
105+
expected_value = fake.word()
106+
expected_batches = 3
107+
metrics = 295
108+
109+
context = get_context()
110+
for index in range(metrics):
111+
expected_key = f"Metric-{index}"
112+
context.put_metric(expected_key, expected_value)
113+
114+
# act
115+
results = serializer.serialize(context)
116+
117+
# assert
118+
assert len(results) == expected_batches
119+
120+
metric_index = 0
121+
for batch_index in range(expected_batches):
122+
expected_metric_count = metrics % 100 if (batch_index == expected_batches - 1) else 100
123+
result_json = results[batch_index]
124+
result_obj = json.loads(result_json)
125+
assert len(result_obj["_aws"]["CloudWatchMetrics"][0]["Metrics"]) == expected_metric_count
126+
127+
for index in range(expected_metric_count):
128+
assert result_obj[f"Metric-{metric_index}"] == expected_value
129+
metric_index += 1
130+
131+
132+
def test_serialize_with_multiple_metrics():
133+
# arrange
134+
metrics = 2
135+
expected = {**get_empty_payload()}
136+
context = get_context()
137+
138+
for index in range(metrics):
139+
expected_key = f"Metric-{index}"
140+
expected_value = fake.word()
141+
context.put_metric(expected_key, expected_value)
142+
143+
expected_metric_definition = {"Name": expected_key, "Unit": "None"}
144+
expected[expected_key] = expected_value
145+
expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append(
146+
expected_metric_definition
147+
)
148+
149+
# act
150+
results = serializer.serialize(context)
151+
152+
# assert
153+
assert len(results) == 1
154+
assert results == [json.dumps(expected)]
155+
156+
157+
def test_serialize_metrics_with_multiple_datapoints():
158+
# arrange
159+
expected_key = fake.word()
160+
expected_values = [fake.word(), fake.word()]
161+
expected_metric_definition = {"Name": expected_key, "Unit": "None"}
162+
expected = {**get_empty_payload()}
163+
expected[expected_key] = expected_values
164+
expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append(
165+
expected_metric_definition
166+
)
167+
168+
context = get_context()
169+
for expected_value in expected_values:
170+
context.put_metric(expected_key, expected_value)
171+
172+
# act
173+
results = serializer.serialize(context)
174+
175+
# assert
176+
assert len(results) == 1
177+
assert results == [json.dumps(expected)]
178+
179+
103180
# Test utility method
104181

105182

tests/sinks/test_agent_sink.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
from aws_embedded_metrics.logger.metrics_context import MetricsContext
12
from aws_embedded_metrics.sinks.agent_sink import AgentSink
23
from aws_embedded_metrics.config import get_config
4+
from unittest.mock import patch, Mock
5+
36

47
Config = get_config()
58

@@ -70,3 +73,23 @@ def test_fallback_to_default_endpoint_on_parse_failure():
7073
assert sink.endpoint.hostname == expected_hostname
7174
assert sink.endpoint.port == expected_port
7275
assert sink.endpoint.scheme == expected_protocol
76+
77+
78+
@patch("aws_embedded_metrics.sinks.agent_sink.get_socket_client")
79+
def test_more_than_max_number_of_metrics(mock_get_socket_client):
80+
# arrange
81+
context = MetricsContext.empty()
82+
expected_metrics = 401
83+
expected_send_message_calls = 5
84+
for index in range(expected_metrics):
85+
context.put_metric(f"{index}", 1)
86+
87+
mock_tcp_client = Mock()
88+
mock_get_socket_client.return_value = mock_tcp_client
89+
90+
# act
91+
sink = AgentSink("")
92+
sink.accept(context)
93+
94+
# assert
95+
assert expected_send_message_calls == mock_tcp_client.send_message.call_count

tests/sinks/test_lambda_sink.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
from aws_embedded_metrics.sinks.lambda_sink import LambdaSink
22
from aws_embedded_metrics.logger.metrics_context import MetricsContext
3+
from faker import Faker
4+
from unittest.mock import patch
5+
6+
7+
fake = Faker()
38

49

510
def test_accept_writes_to_stdout(capfd):
@@ -17,3 +22,21 @@ def test_accept_writes_to_stdout(capfd):
1722
out
1823
== '{"_aws": {"Timestamp": 1, "CloudWatchMetrics": [{"Dimensions": [], "Metrics": [], "Namespace": "aws-embedded-metrics"}]}}\n'
1924
)
25+
26+
27+
@patch("aws_embedded_metrics.serializers.log_serializer.LogSerializer")
28+
def test_accept_writes_multiple_messages_to_stdout(mock_serializer, capfd):
29+
# arrange
30+
expected_messages = [fake.word() for _ in range(10)]
31+
mock_serializer.serialize.return_value = expected_messages
32+
sink = LambdaSink(serializer=mock_serializer)
33+
context = MetricsContext.empty()
34+
context.meta["Timestamp"] = 1
35+
36+
# act
37+
sink.accept(context)
38+
39+
# assert
40+
out, err = capfd.readouterr()
41+
assert len(out.split()) == len(expected_messages)
42+
assert out.split() == expected_messages

0 commit comments

Comments
 (0)