Skip to content

Commit 3e5ccce

Browse files
authored
Split log entries so each entry contains a maximum of 100 datapoints (#64)
* Add test on not emitting more than 100 datapoints per log entry. * Limit the count of datapoints per log batch to 100. * Allow passing arguments to pytest from tox. * Limit datapoints to 100 per metric instead of 100 per batch. * Add test with a single metric with more datapoints than others. * Add test to serialize data with more than 100 and metrics and more 100 datapoints per metric. * Add comments and rename confusing variable name. * Add extra asserts. Compute the full expected results so we ensure the slicing logic doesn't cause any datapoint to be lost.
1 parent f645c6b commit 3e5ccce

File tree

4 files changed

+122
-19
lines changed

4 files changed

+122
-19
lines changed

aws_embedded_metrics/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414
DEFAULT_NAMESPACE = "aws-embedded-metrics"
1515
MAX_DIMENSIONS = 9
1616
MAX_METRICS_PER_EVENT = 100
17+
MAX_DATAPOINTS_PER_METRIC = 100

aws_embedded_metrics/serializers/log_serializer.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
from aws_embedded_metrics.config import get_config
1515
from aws_embedded_metrics.logger.metrics_context import MetricsContext
1616
from aws_embedded_metrics.serializers import Serializer
17-
from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT
17+
from aws_embedded_metrics.constants import (
18+
MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_METRIC
19+
)
1820
import json
1921
from typing import Any, Dict, List
2022

@@ -50,29 +52,48 @@ def create_body() -> Dict[str, Any]:
5052
}
5153
return body
5254

53-
current_body: Dict[str, Any] = create_body()
55+
current_body: Dict[str, Any] = {}
5456
event_batches: List[str] = []
5557
num_metrics_in_current_body = 0
5658

57-
for metric_name, metric in context.metrics.items():
59+
# Track if any given metric has data remaining to be serialized
60+
remaining_data = True
5861

59-
if len(metric.values) == 1:
60-
current_body[metric_name] = metric.values[0]
61-
else:
62-
current_body[metric_name] = metric.values
62+
# Track batch number to know where to slice metric data
63+
i = 0
6364

64-
if not config.disable_metric_extraction:
65-
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
65+
while remaining_data:
66+
remaining_data = False
67+
current_body = create_body()
6668

67-
num_metrics_in_current_body += 1
69+
for metric_name, metric in context.metrics.items():
6870

69-
should_serialize: bool = num_metrics_in_current_body == MAX_METRICS_PER_EVENT
70-
if should_serialize:
71-
event_batches.append(json.dumps(current_body))
72-
current_body = create_body()
73-
num_metrics_in_current_body = 0
71+
if len(metric.values) == 1:
72+
current_body[metric_name] = metric.values[0]
73+
else:
74+
# Slice metric data as each batch cannot contain more than
75+
# MAX_DATAPOINTS_PER_METRIC entries for a given metric
76+
start_index = i * MAX_DATAPOINTS_PER_METRIC
77+
end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC
78+
current_body[metric_name] = metric.values[start_index:end_index]
79+
80+
# Make sure to consume remaining values if we sliced before the end
81+
# of the metric value list
82+
if len(metric.values) > end_index:
83+
remaining_data = True
7484

75-
if not event_batches or num_metrics_in_current_body > 0:
76-
event_batches.append(json.dumps(current_body))
85+
if not config.disable_metric_extraction:
86+
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
87+
num_metrics_in_current_body += 1
88+
89+
if (num_metrics_in_current_body == MAX_METRICS_PER_EVENT):
90+
event_batches.append(json.dumps(current_body))
91+
current_body = create_body()
92+
num_metrics_in_current_body = 0
93+
94+
# iter over missing datapoints
95+
i += 1
96+
if not event_batches or num_metrics_in_current_body > 0:
97+
event_batches.append(json.dumps(current_body))
7798

7899
return event_batches

tests/serializer/test_log_serializer.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from aws_embedded_metrics.config import get_config
22
from aws_embedded_metrics.logger.metrics_context import MetricsContext
33
from aws_embedded_metrics.serializers.log_serializer import LogSerializer
4+
from collections import Counter
45
from faker import Faker
56
import json
67

@@ -130,6 +131,86 @@ def test_serialize_more_than_100_metrics():
130131
metric_index += 1
131132

132133

134+
def test_serialize_more_than_100_datapoints():
135+
expected_batches = 3
136+
datapoints = 295
137+
metrics = 3
138+
139+
context = get_context()
140+
for index in range(metrics):
141+
expected_key = f"Metric-{index}"
142+
for i in range(datapoints):
143+
context.put_metric(expected_key, i)
144+
145+
# add one metric with more datapoints
146+
expected_extra_batches = 2
147+
extra_datapoints = 433
148+
for i in range(extra_datapoints):
149+
context.put_metric(f"Metric-{metrics}", i)
150+
151+
# act
152+
results = serializer.serialize(context)
153+
154+
# assert
155+
assert len(results) == expected_batches + expected_extra_batches
156+
157+
for batch_index in range(expected_batches):
158+
result_json = results[batch_index]
159+
result_obj = json.loads(result_json)
160+
for index in range(metrics):
161+
metric_name = f"Metric-{index}"
162+
expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100
163+
assert len(result_obj[metric_name]) == expected_datapoint_count
164+
165+
# extra metric with more datapoints
166+
for batch_index in range(expected_batches):
167+
result_json = results[batch_index]
168+
result_obj = json.loads(result_json)
169+
metric_name = f"Metric-{metrics}"
170+
expected_datapoint_count = extra_datapoints % 100 if (batch_index == expected_batches + expected_extra_batches - 1) else 100
171+
assert len(result_obj[metric_name]) == expected_datapoint_count
172+
173+
174+
def test_serialize_with_more_than_100_metrics_and_datapoints():
175+
expected_batches = 11
176+
datapoints = 295
177+
metrics = 295
178+
179+
expected_results = {}
180+
metric_results = {}
181+
context = get_context()
182+
for index in range(metrics):
183+
expected_key = f"Metric-{index}"
184+
expected_results[expected_key] = []
185+
metric_results[expected_key] = []
186+
187+
for i in range(datapoints):
188+
context.put_metric(expected_key, i)
189+
expected_results[expected_key].append(i)
190+
191+
# act
192+
results = serializer.serialize(context)
193+
194+
# assert
195+
assert len(results) == expected_batches
196+
197+
datapoints_count = Counter()
198+
for batch in results:
199+
result = json.loads(batch)
200+
datapoints_count.update({
201+
metric: len(result[metric])
202+
for metric in result if metric != "_aws"
203+
})
204+
for metric in result:
205+
if metric != "_aws":
206+
metric_results[metric] += result[metric]
207+
208+
for count in datapoints_count.values():
209+
assert count == datapoints
210+
assert len(datapoints_count) == metrics
211+
assert metric_results == expected_results
212+
213+
133214
def test_serialize_with_multiple_metrics():
134215
# arrange
135216
metrics = 2

tox.ini

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ deps =
99
Faker
1010
aiohttp
1111
aresponses
12-
commands = pytest --ignore=tests/integ
12+
commands = pytest --ignore=tests/integ {posargs}
1313

1414
[testenv:integ]
1515
deps =
@@ -38,4 +38,4 @@ commands =
3838

3939
[flake8]
4040
max-line-length = 150
41-
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv
41+
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv

0 commit comments

Comments
 (0)