Skip to content

Split log entries so each entry contains a maximum of 100 datapoints #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws_embedded_metrics/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
DEFAULT_NAMESPACE = "aws-embedded-metrics"
MAX_DIMENSIONS = 9
MAX_METRICS_PER_EVENT = 100
MAX_DATAPOINTS_PER_METRIC = 100
55 changes: 38 additions & 17 deletions aws_embedded_metrics/serializers/log_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from aws_embedded_metrics.config import get_config
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers import Serializer
from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT
from aws_embedded_metrics.constants import (
MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_METRIC
)
import json
from typing import Any, Dict, List

Expand Down Expand Up @@ -50,29 +52,48 @@ def create_body() -> Dict[str, Any]:
}
return body

current_body: Dict[str, Any] = create_body()
current_body: Dict[str, Any] = {}
event_batches: List[str] = []
num_metrics_in_current_body = 0

for metric_name, metric in context.metrics.items():
# Track if any given metric has data remaining to be serialized
remaining_data = True

if len(metric.values) == 1:
current_body[metric_name] = metric.values[0]
else:
current_body[metric_name] = metric.values
# Track batch number to know where to slice metric data
i = 0

if not config.disable_metric_extraction:
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
while remaining_data:
remaining_data = False
current_body = create_body()

num_metrics_in_current_body += 1
for metric_name, metric in context.metrics.items():

should_serialize: bool = num_metrics_in_current_body == MAX_METRICS_PER_EVENT
if should_serialize:
event_batches.append(json.dumps(current_body))
current_body = create_body()
num_metrics_in_current_body = 0
if len(metric.values) == 1:
current_body[metric_name] = metric.values[0]
else:
# Slice metric data as each batch cannot contain more than
# MAX_DATAPOINTS_PER_METRIC entries for a given metric
start_index = i * MAX_DATAPOINTS_PER_METRIC
end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC
current_body[metric_name] = metric.values[start_index:end_index]

# Make sure to consume remaining values if we sliced before the end
# of the metric value list
if len(metric.values) > end_index:
remaining_data = True

if not event_batches or num_metrics_in_current_body > 0:
event_batches.append(json.dumps(current_body))
if not config.disable_metric_extraction:
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
num_metrics_in_current_body += 1

if (num_metrics_in_current_body == MAX_METRICS_PER_EVENT):
event_batches.append(json.dumps(current_body))
current_body = create_body()
num_metrics_in_current_body = 0

# iter over missing datapoints
i += 1
if not event_batches or num_metrics_in_current_body > 0:
event_batches.append(json.dumps(current_body))

return event_batches
81 changes: 81 additions & 0 deletions tests/serializer/test_log_serializer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from aws_embedded_metrics.config import get_config
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers.log_serializer import LogSerializer
from collections import Counter
from faker import Faker
import json

Expand Down Expand Up @@ -130,6 +131,86 @@ def test_serialize_more_than_100_metrics():
metric_index += 1


def test_serialize_more_than_100_datapoints():
expected_batches = 3
datapoints = 295
metrics = 3

context = get_context()
for index in range(metrics):
expected_key = f"Metric-{index}"
for i in range(datapoints):
context.put_metric(expected_key, i)

# add one metric with more datapoints
expected_extra_batches = 2
extra_datapoints = 433
for i in range(extra_datapoints):
context.put_metric(f"Metric-{metrics}", i)

# act
results = serializer.serialize(context)

# assert
assert len(results) == expected_batches + expected_extra_batches

for batch_index in range(expected_batches):
result_json = results[batch_index]
result_obj = json.loads(result_json)
for index in range(metrics):
metric_name = f"Metric-{index}"
expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100
assert len(result_obj[metric_name]) == expected_datapoint_count

# extra metric with more datapoints
for batch_index in range(expected_batches):
result_json = results[batch_index]
result_obj = json.loads(result_json)
metric_name = f"Metric-{metrics}"
expected_datapoint_count = extra_datapoints % 100 if (batch_index == expected_batches + expected_extra_batches - 1) else 100
assert len(result_obj[metric_name]) == expected_datapoint_count


def test_serialize_with_more_than_100_metrics_and_datapoints():
expected_batches = 11
datapoints = 295
metrics = 295

expected_results = {}
metric_results = {}
context = get_context()
for index in range(metrics):
expected_key = f"Metric-{index}"
expected_results[expected_key] = []
metric_results[expected_key] = []

for i in range(datapoints):
context.put_metric(expected_key, i)
expected_results[expected_key].append(i)

# act
results = serializer.serialize(context)

# assert
assert len(results) == expected_batches

datapoints_count = Counter()
for batch in results:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have an assertion on the number of expected events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an extra assert on the batch count.

I've also added logic and assert to validate that we don't miss any datapoint with the slicing logic.

result = json.loads(batch)
datapoints_count.update({
metric: len(result[metric])
for metric in result if metric != "_aws"
})
for metric in result:
if metric != "_aws":
metric_results[metric] += result[metric]

for count in datapoints_count.values():
assert count == datapoints
assert len(datapoints_count) == metrics
assert metric_results == expected_results


def test_serialize_with_multiple_metrics():
# arrange
metrics = 2
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ deps =
Faker
aiohttp
aresponses
commands = pytest --ignore=tests/integ
commands = pytest --ignore=tests/integ {posargs}

[testenv:integ]
deps =
Expand Down Expand Up @@ -38,4 +38,4 @@ commands =

[flake8]
max-line-length = 150
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv