From fe1a29c47b99bc0f014f8fe1de3834ac3c736c4b Mon Sep 17 00:00:00 2001 From: Paul Ezvan Date: Fri, 15 Jan 2021 16:36:54 +0100 Subject: [PATCH 1/8] Add test on not emitting more than 100 datapoints per log entry. --- tests/serializer/test_log_serializer.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index 32795f7..bbec7c2 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -130,6 +130,31 @@ def test_serialize_more_than_100_metrics(): metric_index += 1 +def test_serialize_more_than_100_datapoints(): + expected_value = fake.word() + expected_batches = 6 + datapoints = 195 + metrics = 3 + + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + for _ in range(datapoints): + context.put_metric(expected_key, expected_value) + + # act + results = serializer.serialize(context) + + # assert + #assert len(results) == expected_batches + + for batch_index in range(expected_batches): + expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 + result_json = results[batch_index] + result_obj = json.loads(result_json) + datapoint_count = sum([len(result_obj[f"Metric-{index}"]) for index in range(metrics)]) + assert datapoint_count == expected_datapoint_count + def test_serialize_with_multiple_metrics(): # arrange metrics = 2 From 091592bed2710cf0d3bc86cc8a69575ebaec019a Mon Sep 17 00:00:00 2001 From: Paul Ezvan Date: Fri, 15 Jan 2021 19:05:12 +0100 Subject: [PATCH 2/8] Limit the count of datapoints per log batch to 100. --- aws_embedded_metrics/constants.py | 1 + .../serializers/log_serializer.py | 57 ++++++++++++++----- tests/serializer/test_log_serializer.py | 17 ++++-- 3 files changed, 57 insertions(+), 18 deletions(-) diff --git a/aws_embedded_metrics/constants.py b/aws_embedded_metrics/constants.py index 756e9cd..79300eb 100644 --- a/aws_embedded_metrics/constants.py +++ b/aws_embedded_metrics/constants.py @@ -14,3 +14,4 @@ DEFAULT_NAMESPACE = "aws-embedded-metrics" MAX_DIMENSIONS = 9 MAX_METRICS_PER_EVENT = 100 +MAX_DATAPOINTS_PER_EVENT = 100 diff --git a/aws_embedded_metrics/serializers/log_serializer.py b/aws_embedded_metrics/serializers/log_serializer.py index fcc496a..0615b4e 100644 --- a/aws_embedded_metrics/serializers/log_serializer.py +++ b/aws_embedded_metrics/serializers/log_serializer.py @@ -14,7 +14,9 @@ from aws_embedded_metrics.config import get_config from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers import Serializer -from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT +from aws_embedded_metrics.constants import ( + MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_EVENT +) import json from typing import Any, Dict, List @@ -53,24 +55,53 @@ def create_body() -> Dict[str, Any]: current_body: Dict[str, Any] = create_body() event_batches: List[str] = [] num_metrics_in_current_body = 0 + num_datapoints_in_current_body = 0 + last_datapoint_index = 0 for metric_name, metric in context.metrics.items(): - if len(metric.values) == 1: - current_body[metric_name] = metric.values[0] - else: - current_body[metric_name] = metric.values + consumed_datapoints = 0 - if not config.disable_metric_extraction: - current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) + datapoint_count = len(metric.values) if len(metric.values) > 1 else 1 + while consumed_datapoints < datapoint_count: + + if len(metric.values) == 1: + current_body[metric_name] = metric.values[0] + num_datapoints_in_current_body += 1 + consumed_datapoints += 1 + elif ( + len(metric.values) + num_datapoints_in_current_body - last_datapoint_index > + MAX_DATAPOINTS_PER_EVENT + ): + current_body[metric_name] = metric.values[ + last_datapoint_index: last_datapoint_index + MAX_DATAPOINTS_PER_EVENT - num_datapoints_in_current_body + ] + last_datapoint_index = MAX_DATAPOINTS_PER_EVENT - num_datapoints_in_current_body + consumed_datapoints += MAX_DATAPOINTS_PER_EVENT - num_datapoints_in_current_body + num_datapoints_in_current_body = MAX_DATAPOINTS_PER_EVENT + elif (last_datapoint_index > 0): + current_body[metric_name] = metric.values[last_datapoint_index:] + last_datapoint_index = 0 + num_datapoints_in_current_body += len(current_body[metric_name]) + consumed_datapoints += len(current_body[metric_name]) + else: + current_body[metric_name] = metric.values + num_datapoints_in_current_body += len(current_body[metric_name]) + consumed_datapoints += len(current_body[metric_name]) + + if not config.disable_metric_extraction: + current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) - num_metrics_in_current_body += 1 + num_metrics_in_current_body += 1 - should_serialize: bool = num_metrics_in_current_body == MAX_METRICS_PER_EVENT - if should_serialize: - event_batches.append(json.dumps(current_body)) - current_body = create_body() - num_metrics_in_current_body = 0 + if ( + num_metrics_in_current_body == MAX_METRICS_PER_EVENT or + num_datapoints_in_current_body >= MAX_DATAPOINTS_PER_EVENT + ): + event_batches.append(json.dumps(current_body)) + current_body = create_body() + num_metrics_in_current_body = 0 + num_datapoints_in_current_body = 0 if not event_batches or num_metrics_in_current_body > 0: event_batches.append(json.dumps(current_body)) diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index bbec7c2..9765b5d 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -131,7 +131,6 @@ def test_serialize_more_than_100_metrics(): def test_serialize_more_than_100_datapoints(): - expected_value = fake.word() expected_batches = 6 datapoints = 195 metrics = 3 @@ -139,22 +138,30 @@ def test_serialize_more_than_100_datapoints(): context = get_context() for index in range(metrics): expected_key = f"Metric-{index}" - for _ in range(datapoints): - context.put_metric(expected_key, expected_value) + for i in range(datapoints): + context.put_metric(expected_key, i) # act results = serializer.serialize(context) # assert - #assert len(results) == expected_batches + assert len(results) == expected_batches for batch_index in range(expected_batches): expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 result_json = results[batch_index] result_obj = json.loads(result_json) - datapoint_count = sum([len(result_obj[f"Metric-{index}"]) for index in range(metrics)]) + datapoint_count = sum( + [ + len(result_obj.get( + f"Metric-{index}", list() + )) + for index in range(metrics) + ] + ) assert datapoint_count == expected_datapoint_count + def test_serialize_with_multiple_metrics(): # arrange metrics = 2 From 819cde5ce41d6c87295a4e6da090d1ba3c4c52cb Mon Sep 17 00:00:00 2001 From: paulezva Date: Mon, 18 Jan 2021 19:26:59 +0100 Subject: [PATCH 3/8] Allow passing arguments to pytest from tox. --- tox.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index 1a33dfa..fe51c57 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,7 @@ deps = Faker aiohttp aresponses -commands = pytest --ignore=tests/integ +commands = pytest --ignore=tests/integ {posargs} [testenv:integ] deps = @@ -38,4 +38,4 @@ commands = [flake8] max-line-length = 150 -exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv \ No newline at end of file +exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv From b476772e4617a58a31ec53d56887580f99c43690 Mon Sep 17 00:00:00 2001 From: paulezva Date: Mon, 18 Jan 2021 19:46:06 +0100 Subject: [PATCH 4/8] Limit datapoints to 100 per metric instead of 100 per batch. --- aws_embedded_metrics/constants.py | 2 +- .../serializers/log_serializer.py | 55 ++++++------------- tests/serializer/test_log_serializer.py | 18 ++---- 3 files changed, 25 insertions(+), 50 deletions(-) diff --git a/aws_embedded_metrics/constants.py b/aws_embedded_metrics/constants.py index 79300eb..6f99bfa 100644 --- a/aws_embedded_metrics/constants.py +++ b/aws_embedded_metrics/constants.py @@ -14,4 +14,4 @@ DEFAULT_NAMESPACE = "aws-embedded-metrics" MAX_DIMENSIONS = 9 MAX_METRICS_PER_EVENT = 100 -MAX_DATAPOINTS_PER_EVENT = 100 +MAX_DATAPOINTS_PER_METRIC = 100 diff --git a/aws_embedded_metrics/serializers/log_serializer.py b/aws_embedded_metrics/serializers/log_serializer.py index 0615b4e..786bf66 100644 --- a/aws_embedded_metrics/serializers/log_serializer.py +++ b/aws_embedded_metrics/serializers/log_serializer.py @@ -15,7 +15,7 @@ from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers import Serializer from aws_embedded_metrics.constants import ( - MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_EVENT + MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_METRIC ) import json from typing import Any, Dict, List @@ -52,58 +52,39 @@ def create_body() -> Dict[str, Any]: } return body - current_body: Dict[str, Any] = create_body() + current_body: Dict[str, Any] = {} event_batches: List[str] = [] num_metrics_in_current_body = 0 - num_datapoints_in_current_body = 0 - last_datapoint_index = 0 + missing_data = True + i = 0 - for metric_name, metric in context.metrics.items(): + while missing_data: + missing_data = False + current_body = create_body() - consumed_datapoints = 0 - - datapoint_count = len(metric.values) if len(metric.values) > 1 else 1 - while consumed_datapoints < datapoint_count: + for metric_name, metric in context.metrics.items(): if len(metric.values) == 1: current_body[metric_name] = metric.values[0] - num_datapoints_in_current_body += 1 - consumed_datapoints += 1 - elif ( - len(metric.values) + num_datapoints_in_current_body - last_datapoint_index > - MAX_DATAPOINTS_PER_EVENT - ): - current_body[metric_name] = metric.values[ - last_datapoint_index: last_datapoint_index + MAX_DATAPOINTS_PER_EVENT - num_datapoints_in_current_body - ] - last_datapoint_index = MAX_DATAPOINTS_PER_EVENT - num_datapoints_in_current_body - consumed_datapoints += MAX_DATAPOINTS_PER_EVENT - num_datapoints_in_current_body - num_datapoints_in_current_body = MAX_DATAPOINTS_PER_EVENT - elif (last_datapoint_index > 0): - current_body[metric_name] = metric.values[last_datapoint_index:] - last_datapoint_index = 0 - num_datapoints_in_current_body += len(current_body[metric_name]) - consumed_datapoints += len(current_body[metric_name]) else: - current_body[metric_name] = metric.values - num_datapoints_in_current_body += len(current_body[metric_name]) - consumed_datapoints += len(current_body[metric_name]) + start_index = i * MAX_DATAPOINTS_PER_METRIC + end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC + current_body[metric_name] = metric.values[start_index:end_index] + if len(metric.values) > end_index: + missing_data = True if not config.disable_metric_extraction: current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) - num_metrics_in_current_body += 1 - if ( - num_metrics_in_current_body == MAX_METRICS_PER_EVENT or - num_datapoints_in_current_body >= MAX_DATAPOINTS_PER_EVENT - ): + if (num_metrics_in_current_body == MAX_METRICS_PER_EVENT): event_batches.append(json.dumps(current_body)) current_body = create_body() num_metrics_in_current_body = 0 - num_datapoints_in_current_body = 0 - if not event_batches or num_metrics_in_current_body > 0: - event_batches.append(json.dumps(current_body)) + # iter over missing datapoints + i += 1 + if not event_batches or num_metrics_in_current_body > 0: + event_batches.append(json.dumps(current_body)) return event_batches diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index 9765b5d..7fb4a83 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -131,8 +131,8 @@ def test_serialize_more_than_100_metrics(): def test_serialize_more_than_100_datapoints(): - expected_batches = 6 - datapoints = 195 + expected_batches = 3 + datapoints = 295 metrics = 3 context = get_context() @@ -148,18 +148,12 @@ def test_serialize_more_than_100_datapoints(): assert len(results) == expected_batches for batch_index in range(expected_batches): - expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 result_json = results[batch_index] result_obj = json.loads(result_json) - datapoint_count = sum( - [ - len(result_obj.get( - f"Metric-{index}", list() - )) - for index in range(metrics) - ] - ) - assert datapoint_count == expected_datapoint_count + for index in range(metrics): + metric_name = f"Metric-{index}" + expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 + assert len(result_obj[metric_name]) == expected_datapoint_count def test_serialize_with_multiple_metrics(): From deaa26802b4d929bfa69f0ed176101510c5fa662 Mon Sep 17 00:00:00 2001 From: paulezva Date: Mon, 18 Jan 2021 19:55:54 +0100 Subject: [PATCH 5/8] Add test with a single metric with more datapoints than others. --- tests/serializer/test_log_serializer.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index 7fb4a83..d40d11b 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -141,11 +141,17 @@ def test_serialize_more_than_100_datapoints(): for i in range(datapoints): context.put_metric(expected_key, i) + # add one metric with more datapoints + expected_extra_batches = 2 + extra_datapoints = 433 + for i in range(extra_datapoints): + context.put_metric(f"Metric-{metrics}", i) + # act results = serializer.serialize(context) # assert - assert len(results) == expected_batches + assert len(results) == expected_batches + expected_extra_batches for batch_index in range(expected_batches): result_json = results[batch_index] @@ -155,6 +161,14 @@ def test_serialize_more_than_100_datapoints(): expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 assert len(result_obj[metric_name]) == expected_datapoint_count + # extra metric with more datapoints + for batch_index in range(expected_batches): + result_json = results[batch_index] + result_obj = json.loads(result_json) + metric_name = f"Metric-{metrics}" + expected_datapoint_count = extra_datapoints % 100 if (batch_index == expected_batches + expected_extra_batches - 1) else 100 + assert len(result_obj[metric_name]) == expected_datapoint_count + def test_serialize_with_multiple_metrics(): # arrange From 26340ceb0a2916042ab369cc4b9d16034525317f Mon Sep 17 00:00:00 2001 From: paulezva Date: Mon, 18 Jan 2021 20:37:12 +0100 Subject: [PATCH 6/8] Add test to serialize data with more than 100 and metrics and more 100 datapoints per metric. --- tests/serializer/test_log_serializer.py | 28 +++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index d40d11b..c2d8802 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -1,6 +1,7 @@ from aws_embedded_metrics.config import get_config from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers.log_serializer import LogSerializer +from collections import Counter from faker import Faker import json @@ -170,6 +171,33 @@ def test_serialize_more_than_100_datapoints(): assert len(result_obj[metric_name]) == expected_datapoint_count +def test_serialize_with_more_than_100_metrics_and_datapoints(): + datapoints = 295 + metrics = 295 + + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + for i in range(datapoints): + context.put_metric(expected_key, i) + + # act + results = serializer.serialize(context) + + # assert + datapoints_count = Counter() + for batch in results: + result = json.loads(batch) + datapoints_count.update({ + metric: len(result[metric]) + for metric in result if metric != "_aws" + }) + + for count in datapoints_count.values(): + assert count == datapoints + assert len(datapoints_count) == metrics + + def test_serialize_with_multiple_metrics(): # arrange metrics = 2 From f78c5eb9435577c61ad75220bb6e7d9c391bfc3c Mon Sep 17 00:00:00 2001 From: paulezva Date: Thu, 28 Jan 2021 15:47:53 +0100 Subject: [PATCH 7/8] Add comments and rename confusing variable name. --- .../serializers/log_serializer.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/aws_embedded_metrics/serializers/log_serializer.py b/aws_embedded_metrics/serializers/log_serializer.py index 786bf66..d5d7ec8 100644 --- a/aws_embedded_metrics/serializers/log_serializer.py +++ b/aws_embedded_metrics/serializers/log_serializer.py @@ -55,11 +55,15 @@ def create_body() -> Dict[str, Any]: current_body: Dict[str, Any] = {} event_batches: List[str] = [] num_metrics_in_current_body = 0 - missing_data = True + + # Track if any given metric has data remaining to be serialized + remaining_data = True + + # Track batch number to know where to slice metric data i = 0 - while missing_data: - missing_data = False + while remaining_data: + remaining_data = False current_body = create_body() for metric_name, metric in context.metrics.items(): @@ -67,11 +71,16 @@ def create_body() -> Dict[str, Any]: if len(metric.values) == 1: current_body[metric_name] = metric.values[0] else: + # Slice metric data as each batch cannot contain more than + # MAX_DATAPOINTS_PER_METRIC entries for a given metric start_index = i * MAX_DATAPOINTS_PER_METRIC end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC current_body[metric_name] = metric.values[start_index:end_index] + + # Make sure to consume remaining values if we sliced before the end + # of the metric value list if len(metric.values) > end_index: - missing_data = True + remaining_data = True if not config.disable_metric_extraction: current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) From 5d62f5251736da8fb28cd47c873061817012675b Mon Sep 17 00:00:00 2001 From: paulezva Date: Thu, 28 Jan 2021 15:48:35 +0100 Subject: [PATCH 8/8] Add extra asserts. Compute the full expected results so we ensure the slicing logic doesn't cause any datapoint to be lost. --- tests/serializer/test_log_serializer.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index c2d8802..314adb9 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -172,19 +172,28 @@ def test_serialize_more_than_100_datapoints(): def test_serialize_with_more_than_100_metrics_and_datapoints(): + expected_batches = 11 datapoints = 295 metrics = 295 + expected_results = {} + metric_results = {} context = get_context() for index in range(metrics): expected_key = f"Metric-{index}" + expected_results[expected_key] = [] + metric_results[expected_key] = [] + for i in range(datapoints): context.put_metric(expected_key, i) + expected_results[expected_key].append(i) # act results = serializer.serialize(context) # assert + assert len(results) == expected_batches + datapoints_count = Counter() for batch in results: result = json.loads(batch) @@ -192,10 +201,14 @@ def test_serialize_with_more_than_100_metrics_and_datapoints(): metric: len(result[metric]) for metric in result if metric != "_aws" }) + for metric in result: + if metric != "_aws": + metric_results[metric] += result[metric] for count in datapoints_count.values(): assert count == datapoints assert len(datapoints_count) == metrics + assert metric_results == expected_results def test_serialize_with_multiple_metrics():