diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py new file mode 100644 index 00000000..a30b12d1 --- /dev/null +++ b/datadog_lambda/api.py @@ -0,0 +1,90 @@ +import os +import logging +import base64 +from datadog_lambda.extension import should_use_extension + +logger = logging.getLogger(__name__) +KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName" + + +def decrypt_kms_api_key(kms_client, ciphertext): + from botocore.exceptions import ClientError + + """ + Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS. + For this to work properly, the Lambda function must have the appropriate IAM permissions. + + Args: + kms_client: The KMS client to use for decryption + ciphertext (string): The base64-encoded ciphertext to decrypt + """ + decoded_bytes = base64.b64decode(ciphertext) + + """ + When the API key is encrypted using the AWS console, the function name is added as an + encryption context. When the API key is encrypted using the AWS CLI, no encryption context + is added. We need to try decrypting the API key both with and without the encryption context. + """ + # Try without encryption context, in case API key was encrypted using the AWS CLI + function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + try: + plaintext = kms_client.decrypt(CiphertextBlob=decoded_bytes)[ + "Plaintext" + ].decode("utf-8") + except ClientError: + logger.debug( + "Failed to decrypt ciphertext without encryption context, \ + retrying with encryption context" + ) + # Try with encryption context, in case API key was encrypted using the AWS Console + plaintext = kms_client.decrypt( + CiphertextBlob=decoded_bytes, + EncryptionContext={ + KMS_ENCRYPTION_CONTEXT_KEY: function_name, + }, + )["Plaintext"].decode("utf-8") + + return plaintext + + +def init_api(): + if ( + not should_use_extension + and not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" + ): + from datadog import api + + if not api._api_key: + import boto3 + + DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") + DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") + DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") + DD_API_KEY = os.environ.get( + "DD_API_KEY", os.environ.get("DATADOG_API_KEY", "") + ) + + if DD_API_KEY_SECRET_ARN: + api._api_key = boto3.client("secretsmanager").get_secret_value( + SecretId=DD_API_KEY_SECRET_ARN + )["SecretString"] + elif DD_API_KEY_SSM_NAME: + api._api_key = boto3.client("ssm").get_parameter( + Name=DD_API_KEY_SSM_NAME, WithDecryption=True + )["Parameter"]["Value"] + elif DD_KMS_API_KEY: + kms_client = boto3.client("kms") + api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) + else: + api._api_key = DD_API_KEY + + logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) + + # Set DATADOG_HOST, to send data to a non-default Datadog datacenter + api._api_host = os.environ.get( + "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") + ) + logger.debug("Setting DATADOG_HOST to %s", api._api_host) + + # Unmute exceptions from datadog api client, so we can catch and handle them + api._mute = False diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 41519b5f..c3e39c97 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -6,124 +6,31 @@ import os import json import time -import base64 import logging -from botocore.exceptions import ClientError -import boto3 -from datadog import api, initialize, statsd -from datadog.threadstats import ThreadStats from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer - -KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName" -ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced" +from datadog_lambda.api import init_api logger = logging.getLogger(__name__) lambda_stats = None +init_api() -class StatsWriter: - def distribution(self, metric_name, value, tags=[], timestamp=None): - raise NotImplementedError() - - def flush(self): - raise NotImplementedError() - - def stop(self): - raise NotImplementedError() - - -class StatsDWriter(StatsWriter): - """ - Writes distribution metrics using StatsD protocol - """ - - def __init__(self): - options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} - initialize(**options) - - def distribution(self, metric_name, value, tags=[], timestamp=None): - statsd.distribution(metric_name, value, tags=tags) - - def flush(self): - pass - - def stop(self): - pass - +if should_use_extension: + from datadog_lambda.statsd_writer import StatsDWriter -class ThreadStatsWriter(StatsWriter): - """ - Writes distribution metrics using the ThreadStats class - """ - - def __init__(self, flush_in_thread): - self.thread_stats = ThreadStats(compress_payload=True) - self.thread_stats.start(flush_in_thread=flush_in_thread) - - def distribution(self, metric_name, value, tags=[], timestamp=None): - self.thread_stats.distribution( - metric_name, value, tags=tags, timestamp=timestamp - ) - - def flush(self): - """ "Flush distributions from ThreadStats to Datadog. - Modified based on `datadog.threadstats.base.ThreadStats.flush()`, - to gain better control over exception handling. - """ - _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) - count_dists = len(dists) - if not count_dists: - logger.debug("No distributions to flush. Continuing.") + lambda_stats = StatsDWriter() +else: + # Periodical flushing in a background thread is NOT guaranteed to succeed + # and leads to data loss. When disabled, metrics are only flushed at the + # end of invocation. To make metrics submitted from a long-running Lambda + # function available sooner, consider using the Datadog Lambda extension. + from datadog_lambda.thread_stats_writer import ThreadStatsWriter - self.thread_stats.flush_count += 1 - logger.debug( - "Flush #%s sending %s distributions", - self.thread_stats.flush_count, - count_dists, - ) - try: - self.thread_stats.reporter.flush_distributions(dists) - except Exception as e: - # The nature of the root issue https://bugs.python.org/issue41345 is complex, - # but comprehensive tests suggest that it is safe to retry on this specific error. - if isinstance( - e, api.exceptions.ClientError - ) and "RemoteDisconnected" in str(e): - logger.debug( - "Retry flush #%s due to RemoteDisconnected", - self.thread_stats.flush_count, - ) - try: - self.thread_stats.reporter.flush_distributions(dists) - except Exception: - logger.debug( - "Flush #%s failed after retry", - self.thread_stats.flush_count, - exc_info=True, - ) - else: - logger.debug( - "Flush #%s failed", self.thread_stats.flush_count, exc_info=True - ) - - def stop(self): - self.thread_stats.stop() - - -def init_lambda_stats(): - global lambda_stats - if should_use_extension: - lambda_stats = StatsDWriter() - else: - # Periodical flushing in a background thread is NOT guaranteed to succeed - # and leads to data loss. When disabled, metrics are only flushed at the - # end of invocation. To make metrics submitted from a long-running Lambda - # function available sooner, consider using the Datadog Lambda extension. - flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" - lambda_stats = ThreadStatsWriter(flush_in_thread) + flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" + lambda_stats = ThreadStatsWriter(flush_in_thread) def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False): @@ -138,16 +45,24 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal Otherwise, the metrics will be submitted to the Datadog API periodically and at the end of the function execution in a background thread. + + Note that if the extension is present, it will override the DD_FLUSH_TO_LOG value + and always use the layer to send metrics to the extension """ - global lambda_stats flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" tags = tag_dd_lambda_layer(tags) - if flush_to_logs or (force_async and not should_use_extension): - write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags) - else: - logger.debug("Sending metric %s to Datadog via lambda layer", metric_name) + if should_use_extension: lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp) + else: + if flush_to_logs or force_async: + write_metric_point_to_stdout( + metric_name, value, timestamp=timestamp, tags=tags + ) + else: + lambda_stats.distribution( + metric_name, value, tags=tags, timestamp=timestamp + ) def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): @@ -168,7 +83,6 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): def flush_stats(): - global lambda_stats lambda_stats.flush() @@ -217,74 +131,3 @@ def submit_errors_metric(lambda_context): lambda_context (dict): Lambda context dict passed to the function by AWS """ submit_enhanced_metric("errors", lambda_context) - - -def decrypt_kms_api_key(kms_client, ciphertext): - """ - Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS. - For this to work properly, the Lambda function must have the appropriate IAM permissions. - - Args: - kms_client: The KMS client to use for decryption - ciphertext (string): The base64-encoded ciphertext to decrypt - """ - decoded_bytes = base64.b64decode(ciphertext) - - """ - When the API key is encrypted using the AWS console, the function name is added as an - encryption context. When the API key is encrypted using the AWS CLI, no encryption context - is added. We need to try decrypting the API key both with and without the encryption context. - """ - # Try without encryption context, in case API key was encrypted using the AWS CLI - function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") - try: - plaintext = kms_client.decrypt(CiphertextBlob=decoded_bytes)[ - "Plaintext" - ].decode("utf-8") - except ClientError: - logger.debug( - "Failed to decrypt ciphertext without encryption context, \ - retrying with encryption context" - ) - # Try with encryption context, in case API key was encrypted using the AWS Console - plaintext = kms_client.decrypt( - CiphertextBlob=decoded_bytes, - EncryptionContext={ - KMS_ENCRYPTION_CONTEXT_KEY: function_name, - }, - )["Plaintext"].decode("utf-8") - - return plaintext - - -# Set API Key -if not api._api_key: - DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") - DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") - DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") - DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) - - if DD_API_KEY_SECRET_ARN: - api._api_key = boto3.client("secretsmanager").get_secret_value( - SecretId=DD_API_KEY_SECRET_ARN - )["SecretString"] - elif DD_API_KEY_SSM_NAME: - api._api_key = boto3.client("ssm").get_parameter( - Name=DD_API_KEY_SSM_NAME, WithDecryption=True - )["Parameter"]["Value"] - elif DD_KMS_API_KEY: - kms_client = boto3.client("kms") - api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) - else: - api._api_key = DD_API_KEY - -logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) - -# Set DATADOG_HOST, to send data to a non-default Datadog datacenter -api._api_host = os.environ.get( - "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") -) -logger.debug("Setting DATADOG_HOST to %s", api._api_host) - -# Unmute exceptions from datadog api client, so we can catch and handle them -api._mute = False diff --git a/datadog_lambda/stats_writer.py b/datadog_lambda/stats_writer.py new file mode 100644 index 00000000..d3919c30 --- /dev/null +++ b/datadog_lambda/stats_writer.py @@ -0,0 +1,9 @@ +class StatsWriter: + def distribution(self, metric_name, value, tags=[], timestamp=None): + raise NotImplementedError() + + def flush(self): + raise NotImplementedError() + + def stop(self): + raise NotImplementedError() diff --git a/datadog_lambda/statsd_writer.py b/datadog_lambda/statsd_writer.py new file mode 100644 index 00000000..cd849e67 --- /dev/null +++ b/datadog_lambda/statsd_writer.py @@ -0,0 +1,21 @@ +from datadog_lambda.stats_writer import StatsWriter +from datadog import initialize, statsd + + +class StatsDWriter(StatsWriter): + """ + Writes distribution metrics using StatsD protocol + """ + + def __init__(self): + options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} + initialize(**options) + + def distribution(self, metric_name, value, tags=[], timestamp=None): + statsd.distribution(metric_name, value, tags=tags) + + def flush(self): + pass + + def stop(self): + pass diff --git a/datadog_lambda/thread_stats_writer.py b/datadog_lambda/thread_stats_writer.py new file mode 100644 index 00000000..91ffcd45 --- /dev/null +++ b/datadog_lambda/thread_stats_writer.py @@ -0,0 +1,62 @@ +import logging +from datadog.threadstats import ThreadStats +from datadog_lambda.stats_writer import StatsWriter + +logger = logging.getLogger(__name__) + + +class ThreadStatsWriter(StatsWriter): + """ + Writes distribution metrics using the ThreadStats class + """ + + def __init__(self, flush_in_thread): + self.thread_stats = ThreadStats(compress_payload=True) + self.thread_stats.start(flush_in_thread=flush_in_thread) + + def distribution(self, metric_name, value, tags=[], timestamp=None): + self.thread_stats.distribution( + metric_name, value, tags=tags, timestamp=timestamp + ) + + def flush(self): + """ "Flush distributions from ThreadStats to Datadog. + Modified based on `datadog.threadstats.base.ThreadStats.flush()`, + to gain better control over exception handling. + """ + _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) + count_dists = len(dists) + if not count_dists: + logger.debug("No distributions to flush. Continuing.") + + self.thread_stats.flush_count += 1 + logger.debug( + "Flush #%s sending %s distributions", + self.thread_stats.flush_count, + count_dists, + ) + try: + self.thread_stats.reporter.flush_distributions(dists) + except Exception as e: + # The nature of the root issue https://bugs.python.org/issue41345 is complex, + # but comprehensive tests suggest that it is safe to retry on this specific error. + if type(e).__name__ == "ClientError" and "RemoteDisconnected" in str(e): + logger.debug( + "Retry flush #%s due to RemoteDisconnected", + self.thread_stats.flush_count, + ) + try: + self.thread_stats.reporter.flush_distributions(dists) + except Exception: + logger.debug( + "Flush #%s failed after retry", + self.thread_stats.flush_count, + exc_info=True, + ) + else: + logger.debug( + "Flush #%s failed", self.thread_stats.flush_count, exc_info=True + ) + + def stop(self): + self.thread_stats.stop() diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 67c92e8c..533c28da 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -12,7 +12,6 @@ from datadog_lambda.cold_start import set_cold_start, is_cold_start from datadog_lambda.constants import XraySubsegment, TraceContextSource from datadog_lambda.metric import ( - init_lambda_stats, flush_stats, submit_invocations_metric, submit_errors_metric, @@ -120,7 +119,6 @@ def __init__(self, func): def __call__(self, event, context, **kwargs): """Executes when the wrapped function gets called""" - init_lambda_stats() self._before(event, context) try: self.response = self.func(event, context, **kwargs) diff --git a/tests/test_metric.py b/tests/test_metric.py index d22cf881..8f896481 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -8,12 +8,11 @@ from botocore.exceptions import ClientError as BotocoreClientError from datadog.api.exceptions import ClientError -from datadog_lambda.metric import ( - decrypt_kms_api_key, - lambda_metric, - ThreadStatsWriter, - KMS_ENCRYPTION_CONTEXT_KEY, -) + + +from datadog_lambda.metric import lambda_metric +from datadog_lambda.api import decrypt_kms_api_key, KMS_ENCRYPTION_CONTEXT_KEY +from datadog_lambda.thread_stats_writer import ThreadStatsWriter from datadog_lambda.tags import _format_dd_lambda_layer_tag @@ -36,6 +35,17 @@ def test_lambda_metric_tagged_with_dd_lambda_layer(self): ] ) + # let's fake that the extension is present, this should override DD_FLUSH_TO_LOG + @patch("datadog_lambda.metric.should_use_extension", True) + def test_lambda_metric_flush_to_log_with_extension(self): + os.environ["DD_FLUSH_TO_LOG"] = "True" + lambda_metric("test", 1) + expected_tag = _format_dd_lambda_layer_tag() + self.mock_metric_lambda_stats.distribution.assert_has_calls( + [call("test", 1, timestamp=None, tags=[expected_tag])] + ) + del os.environ["DD_FLUSH_TO_LOG"] + def test_lambda_metric_flush_to_log(self): os.environ["DD_FLUSH_TO_LOG"] = "True" diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index cfb04ff1..f7190276 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -7,7 +7,8 @@ from mock import patch, call, ANY, MagicMock from datadog_lambda.wrapper import datadog_lambda_wrapper -from datadog_lambda.metric import lambda_metric, ThreadStatsWriter +from datadog_lambda.metric import lambda_metric +from datadog_lambda.thread_stats_writer import ThreadStatsWriter def get_mock_context( @@ -133,11 +134,10 @@ def lambda_handler(event, context): def test_datadog_lambda_wrapper_flush_in_thread(self): # force ThreadStats to flush in thread - os.environ["DD_FLUSH_IN_THREAD"] = "True" import datadog_lambda.metric as metric_module metric_module.lambda_stats.stop() - metric_module.init_lambda_stats() + metric_module.lambda_stats = ThreadStatsWriter(True) @datadog_lambda_wrapper def lambda_handler(event, context): @@ -158,7 +158,6 @@ def lambda_handler(event, context): # reset ThreadStats metric_module.lambda_stats.stop() metric_module.lambda_stats = ThreadStatsWriter(False) - del os.environ["DD_FLUSH_IN_THREAD"] def test_datadog_lambda_wrapper_not_flush_in_thread(self): # force ThreadStats to not flush in thread