diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 115686af..3bc9955f 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -7,6 +7,7 @@ import time import logging import ujson as json +from datetime import datetime, timedelta from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, dd_lambda_layer_tag @@ -61,6 +62,16 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal if should_use_extension and timestamp is not None: # The extension does not support timestamps for distributions so we create a # a thread stats writer to submit metrics with timestamps to the API + timestamp_ceiling = int( + (datetime.now() - timedelta(hours=4)).timestamp() + ) # 4 hours ago + if timestamp_ceiling > timestamp: + logger.warning( + "Timestamp %s is older than 4 hours, not submitting metric %s", + timestamp, + metric_name, + ) + return global extension_thread_stats if extension_thread_stats is None: from datadog_lambda.thread_stats_writer import ThreadStatsWriter @@ -108,11 +119,19 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): ) -def flush_stats(): +def flush_stats(lambda_context=None): lambda_stats.flush() if extension_thread_stats is not None: - extension_thread_stats.flush() + if lambda_context is not None: + tags = get_enhanced_metrics_tags(lambda_context) + split_arn = lambda_context.invoked_function_arn.split(":") + if len(split_arn) > 7: + # Get rid of the alias + split_arn.pop() + arn = ":".join(split_arn) + tags.append("function_arn:" + arn) + extension_thread_stats.flush(tags) def submit_enhanced_metric(metric_name, lambda_context): diff --git a/datadog_lambda/thread_stats_writer.py b/datadog_lambda/thread_stats_writer.py index bfcf3c99..367b8b21 100644 --- a/datadog_lambda/thread_stats_writer.py +++ b/datadog_lambda/thread_stats_writer.py @@ -22,11 +22,13 @@ def distribution(self, metric_name, value, tags=[], timestamp=None): metric_name, value, tags=tags, timestamp=timestamp ) - def flush(self): + def flush(self, tags=None): """ "Flush distributions from ThreadStats to Datadog. Modified based on `datadog.threadstats.base.ThreadStats.flush()`, to gain better control over exception handling. """ + if tags: + self.thread_stats.constant_tags = self.thread_stats.constant_tags + tags _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) count_dists = len(dists) if not count_dists: diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index ba31f2be..9025bfef 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -366,7 +366,7 @@ def _after(self, event, context): logger.debug("Failed to create cold start spans. %s", e) if not self.flush_to_log or should_use_extension: - flush_stats() + flush_stats(context) if should_use_extension and self.local_testing_mode: # when testing locally, the extension does not know when an # invocation completes because it does not have access to the diff --git a/tests/test_metric.py b/tests/test_metric.py index f07a4c6a..031b1180 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -5,7 +5,7 @@ from botocore.exceptions import ClientError as BotocoreClientError from datadog.api.exceptions import ClientError - +from datetime import datetime, timedelta from datadog_lambda.metric import lambda_metric from datadog_lambda.api import decrypt_kms_api_key, KMS_ENCRYPTION_CONTEXT_KEY @@ -49,12 +49,28 @@ def test_lambda_metric_timestamp_with_extension(self): self.mock_metric_extension_thread_stats = patcher.start() self.addCleanup(patcher.stop) - lambda_metric("test_timestamp", 1, 123) + delta = timedelta(minutes=1) + timestamp = int((datetime.now() - delta).timestamp()) + + lambda_metric("test_timestamp", 1, timestamp) self.mock_metric_lambda_stats.distribution.assert_not_called() self.mock_metric_extension_thread_stats.distribution.assert_called_with( - "test_timestamp", 1, timestamp=123, tags=[dd_lambda_layer_tag] + "test_timestamp", 1, timestamp=timestamp, tags=[dd_lambda_layer_tag] ) + @patch("datadog_lambda.metric.should_use_extension", True) + def test_lambda_metric_invalid_timestamp_with_extension(self): + patcher = patch("datadog_lambda.metric.extension_thread_stats") + self.mock_metric_extension_thread_stats = patcher.start() + self.addCleanup(patcher.stop) + + delta = timedelta(hours=5) + timestamp = int((datetime.now() - delta).timestamp()) + + lambda_metric("test_timestamp", 1, timestamp) + self.mock_metric_lambda_stats.distribution.assert_not_called() + self.mock_metric_extension_thread_stats.distribution.assert_not_called() + def test_lambda_metric_flush_to_log(self): os.environ["DD_FLUSH_TO_LOG"] = "True" @@ -84,6 +100,16 @@ def test_retry_on_remote_disconnected(self): lambda_stats.flush() self.assertEqual(self.mock_threadstats_flush_distributions.call_count, 2) + def test_flush_stats_with_tags(self): + lambda_stats = ThreadStatsWriter(True) + tags = ["tag1:value1", "tag2:value2"] + lambda_stats.flush(tags) + self.mock_threadstats_flush_distributions.assert_called_once_with( + lambda_stats.thread_stats._get_aggregate_metrics_and_dists(float("inf"))[1] + ) + for tag in tags: + self.assertTrue(tag in lambda_stats.thread_stats.constant_tags) + MOCK_FUNCTION_NAME = "myFunction"