From d89c9e6c80d0805721e3787b61ed1f7ce619b29b Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 17 Aug 2021 14:32:04 -0400 Subject: [PATCH 01/11] Split file to enable lazy-load --- datadog_lambda/metric.py | 157 ++++++-------------------- datadog_lambda/stats_writer.py | 9 ++ datadog_lambda/statsd_writer.py | 20 ++++ datadog_lambda/thread_stats_writer.py | 58 ++++++++++ 4 files changed, 122 insertions(+), 122 deletions(-) create mode 100644 datadog_lambda/stats_writer.py create mode 100644 datadog_lambda/statsd_writer.py create mode 100644 datadog_lambda/thread_stats_writer.py diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 41519b5f..5dbfb330 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -10,9 +10,6 @@ import logging from botocore.exceptions import ClientError -import boto3 -from datadog import api, initialize, statsd -from datadog.threadstats import ThreadStats from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer @@ -23,101 +20,13 @@ lambda_stats = None - -class StatsWriter: - def distribution(self, metric_name, value, tags=[], timestamp=None): - raise NotImplementedError() - - def flush(self): - raise NotImplementedError() - - def stop(self): - raise NotImplementedError() - - -class StatsDWriter(StatsWriter): - """ - Writes distribution metrics using StatsD protocol - """ - - def __init__(self): - options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} - initialize(**options) - - def distribution(self, metric_name, value, tags=[], timestamp=None): - statsd.distribution(metric_name, value, tags=tags) - - def flush(self): - pass - - def stop(self): - pass - - -class ThreadStatsWriter(StatsWriter): - """ - Writes distribution metrics using the ThreadStats class - """ - - def __init__(self, flush_in_thread): - self.thread_stats = ThreadStats(compress_payload=True) - self.thread_stats.start(flush_in_thread=flush_in_thread) - - def distribution(self, metric_name, value, tags=[], timestamp=None): - self.thread_stats.distribution( - metric_name, value, tags=tags, timestamp=timestamp - ) - - def flush(self): - """ "Flush distributions from ThreadStats to Datadog. - Modified based on `datadog.threadstats.base.ThreadStats.flush()`, - to gain better control over exception handling. - """ - _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) - count_dists = len(dists) - if not count_dists: - logger.debug("No distributions to flush. Continuing.") - - self.thread_stats.flush_count += 1 - logger.debug( - "Flush #%s sending %s distributions", - self.thread_stats.flush_count, - count_dists, - ) - try: - self.thread_stats.reporter.flush_distributions(dists) - except Exception as e: - # The nature of the root issue https://bugs.python.org/issue41345 is complex, - # but comprehensive tests suggest that it is safe to retry on this specific error. - if isinstance( - e, api.exceptions.ClientError - ) and "RemoteDisconnected" in str(e): - logger.debug( - "Retry flush #%s due to RemoteDisconnected", - self.thread_stats.flush_count, - ) - try: - self.thread_stats.reporter.flush_distributions(dists) - except Exception: - logger.debug( - "Flush #%s failed after retry", - self.thread_stats.flush_count, - exc_info=True, - ) - else: - logger.debug( - "Flush #%s failed", self.thread_stats.flush_count, exc_info=True - ) - - def stop(self): - self.thread_stats.stop() - - def init_lambda_stats(): global lambda_stats if should_use_extension: + from datadog_lambda.statsd_writer import StatsDWriter lambda_stats = StatsDWriter() else: + from datadog_lambda.thread_stats_writer import ThreadStatsWriter # Periodical flushing in a background thread is NOT guaranteed to succeed # and leads to data loss. When disabled, metrics are only flushed at the # end of invocation. To make metrics submitted from a long-running Lambda @@ -257,34 +166,38 @@ def decrypt_kms_api_key(kms_client, ciphertext): return plaintext -# Set API Key -if not api._api_key: - DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") - DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") - DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") - DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) - - if DD_API_KEY_SECRET_ARN: - api._api_key = boto3.client("secretsmanager").get_secret_value( - SecretId=DD_API_KEY_SECRET_ARN - )["SecretString"] - elif DD_API_KEY_SSM_NAME: - api._api_key = boto3.client("ssm").get_parameter( - Name=DD_API_KEY_SSM_NAME, WithDecryption=True - )["Parameter"]["Value"] - elif DD_KMS_API_KEY: - kms_client = boto3.client("kms") - api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) - else: - api._api_key = DD_API_KEY - -logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) -# Set DATADOG_HOST, to send data to a non-default Datadog datacenter -api._api_host = os.environ.get( - "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") -) -logger.debug("Setting DATADOG_HOST to %s", api._api_host) +# Set API Key only if extension is not here +if not should_use_extension: + from datadog import api + if not api._api_key: + import boto3 + DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") + DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") + DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") + DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) + + if DD_API_KEY_SECRET_ARN: + api._api_key = boto3.client("secretsmanager").get_secret_value( + SecretId=DD_API_KEY_SECRET_ARN + )["SecretString"] + elif DD_API_KEY_SSM_NAME: + api._api_key = boto3.client("ssm").get_parameter( + Name=DD_API_KEY_SSM_NAME, WithDecryption=True + )["Parameter"]["Value"] + elif DD_KMS_API_KEY: + kms_client = boto3.client("kms") + api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) + else: + api._api_key = DD_API_KEY + + logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) + + # Set DATADOG_HOST, to send data to a non-default Datadog datacenter + api._api_host = os.environ.get( + "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") + ) + logger.debug("Setting DATADOG_HOST to %s", api._api_host) -# Unmute exceptions from datadog api client, so we can catch and handle them -api._mute = False + # Unmute exceptions from datadog api client, so we can catch and handle them + api._mute = False \ No newline at end of file diff --git a/datadog_lambda/stats_writer.py b/datadog_lambda/stats_writer.py new file mode 100644 index 00000000..0891d3de --- /dev/null +++ b/datadog_lambda/stats_writer.py @@ -0,0 +1,9 @@ +class StatsWriter: + def distribution(self, metric_name, value, tags=[], timestamp=None): + raise NotImplementedError() + + def flush(self): + raise NotImplementedError() + + def stop(self): + raise NotImplementedError() \ No newline at end of file diff --git a/datadog_lambda/statsd_writer.py b/datadog_lambda/statsd_writer.py new file mode 100644 index 00000000..4b14c48c --- /dev/null +++ b/datadog_lambda/statsd_writer.py @@ -0,0 +1,20 @@ +from datadog_lambda.stats_writer import StatsWriter +from datadog import initialize, statsd + +class StatsDWriter(StatsWriter): + """ + Writes distribution metrics using StatsD protocol + """ + + def __init__(self): + options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} + initialize(**options) + + def distribution(self, metric_name, value, tags=[], timestamp=None): + statsd.distribution(metric_name, value, tags=tags) + + def flush(self): + pass + + def stop(self): + pass \ No newline at end of file diff --git a/datadog_lambda/thread_stats_writer.py b/datadog_lambda/thread_stats_writer.py new file mode 100644 index 00000000..7cb4ab73 --- /dev/null +++ b/datadog_lambda/thread_stats_writer.py @@ -0,0 +1,58 @@ +from datadog.threadstats import ThreadStats +from datadog_lambda.stats_writer import StatsWriter + +class ThreadStatsWriter(StatsWriter): + """ + Writes distribution metrics using the ThreadStats class + """ + + def __init__(self, flush_in_thread): + self.thread_stats = ThreadStats(compress_payload=True) + self.thread_stats.start(flush_in_thread=flush_in_thread) + + def distribution(self, metric_name, value, tags=[], timestamp=None): + self.thread_stats.distribution( + metric_name, value, tags=tags, timestamp=timestamp + ) + + def flush(self): + """ "Flush distributions from ThreadStats to Datadog. + Modified based on `datadog.threadstats.base.ThreadStats.flush()`, + to gain better control over exception handling. + """ + _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) + count_dists = len(dists) + if not count_dists: + logger.debug("No distributions to flush. Continuing.") + + self.thread_stats.flush_count += 1 + logger.debug( + "Flush #%s sending %s distributions", + self.thread_stats.flush_count, + count_dists, + ) + try: + self.thread_stats.reporter.flush_distributions(dists) + except Exception as e: + # The nature of the root issue https://bugs.python.org/issue41345 is complex, + # but comprehensive tests suggest that it is safe to retry on this specific error. + if type(e).__name__ == "ClientError" and "RemoteDisconnected" in str(e): + logger.debug( + "Retry flush #%s due to RemoteDisconnected", + self.thread_stats.flush_count, + ) + try: + self.thread_stats.reporter.flush_distributions(dists) + except Exception: + logger.debug( + "Flush #%s failed after retry", + self.thread_stats.flush_count, + exc_info=True, + ) + else: + logger.debug( + "Flush #%s failed", self.thread_stats.flush_count, exc_info=True + ) + + def stop(self): + self.thread_stats.stop() \ No newline at end of file From 2b8fa008e3cfbaea5a211f69fe3048dcf4f41a69 Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 17 Aug 2021 15:01:37 -0400 Subject: [PATCH 02/11] tests and lint --- datadog_lambda/metric.py | 4 ++-- datadog_lambda/stats_writer.py | 2 +- datadog_lambda/statsd_writer.py | 3 ++- datadog_lambda/thread_stats_writer.py | 6 +++++- tests/test_metric.py | 2 +- tests/test_wrapper.py | 3 ++- 6 files changed, 13 insertions(+), 7 deletions(-) diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 5dbfb330..8e4d50e9 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -20,6 +20,7 @@ lambda_stats = None + def init_lambda_stats(): global lambda_stats if should_use_extension: @@ -166,7 +167,6 @@ def decrypt_kms_api_key(kms_client, ciphertext): return plaintext - # Set API Key only if extension is not here if not should_use_extension: from datadog import api @@ -200,4 +200,4 @@ def decrypt_kms_api_key(kms_client, ciphertext): logger.debug("Setting DATADOG_HOST to %s", api._api_host) # Unmute exceptions from datadog api client, so we can catch and handle them - api._mute = False \ No newline at end of file + api._mute = False diff --git a/datadog_lambda/stats_writer.py b/datadog_lambda/stats_writer.py index 0891d3de..d3919c30 100644 --- a/datadog_lambda/stats_writer.py +++ b/datadog_lambda/stats_writer.py @@ -6,4 +6,4 @@ def flush(self): raise NotImplementedError() def stop(self): - raise NotImplementedError() \ No newline at end of file + raise NotImplementedError() diff --git a/datadog_lambda/statsd_writer.py b/datadog_lambda/statsd_writer.py index 4b14c48c..cd849e67 100644 --- a/datadog_lambda/statsd_writer.py +++ b/datadog_lambda/statsd_writer.py @@ -1,6 +1,7 @@ from datadog_lambda.stats_writer import StatsWriter from datadog import initialize, statsd + class StatsDWriter(StatsWriter): """ Writes distribution metrics using StatsD protocol @@ -17,4 +18,4 @@ def flush(self): pass def stop(self): - pass \ No newline at end of file + pass diff --git a/datadog_lambda/thread_stats_writer.py b/datadog_lambda/thread_stats_writer.py index 7cb4ab73..91ffcd45 100644 --- a/datadog_lambda/thread_stats_writer.py +++ b/datadog_lambda/thread_stats_writer.py @@ -1,6 +1,10 @@ +import logging from datadog.threadstats import ThreadStats from datadog_lambda.stats_writer import StatsWriter +logger = logging.getLogger(__name__) + + class ThreadStatsWriter(StatsWriter): """ Writes distribution metrics using the ThreadStats class @@ -55,4 +59,4 @@ def flush(self): ) def stop(self): - self.thread_stats.stop() \ No newline at end of file + self.thread_stats.stop() diff --git a/tests/test_metric.py b/tests/test_metric.py index d22cf881..2caf4a5f 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -11,9 +11,9 @@ from datadog_lambda.metric import ( decrypt_kms_api_key, lambda_metric, - ThreadStatsWriter, KMS_ENCRYPTION_CONTEXT_KEY, ) +from datadog_lambda.thread_stats_writer import ThreadStatsWriter from datadog_lambda.tags import _format_dd_lambda_layer_tag diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index cfb04ff1..5995cad7 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -7,7 +7,8 @@ from mock import patch, call, ANY, MagicMock from datadog_lambda.wrapper import datadog_lambda_wrapper -from datadog_lambda.metric import lambda_metric, ThreadStatsWriter +from datadog_lambda.metric import lambda_metric +from datadog_lambda.thread_stats_writer import ThreadStatsWriter def get_mock_context( From 39381486076aa1bc2a3e50ecbb972a6ca8cd3ccc Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 17 Aug 2021 15:22:39 -0400 Subject: [PATCH 03/11] lint --- datadog_lambda/metric.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 8e4d50e9..2a04f13b 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -25,9 +25,11 @@ def init_lambda_stats(): global lambda_stats if should_use_extension: from datadog_lambda.statsd_writer import StatsDWriter + lambda_stats = StatsDWriter() else: from datadog_lambda.thread_stats_writer import ThreadStatsWriter + # Periodical flushing in a background thread is NOT guaranteed to succeed # and leads to data loss. When disabled, metrics are only flushed at the # end of invocation. To make metrics submitted from a long-running Lambda @@ -170,8 +172,10 @@ def decrypt_kms_api_key(kms_client, ciphertext): # Set API Key only if extension is not here if not should_use_extension: from datadog import api + if not api._api_key: import boto3 + DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") From 6c65191bc60e7f1e32009707662dcffcb92d01a0 Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 14:08:26 -0400 Subject: [PATCH 04/11] pr review --- datadog_lambda/__init__.py | 3 + datadog_lambda/api.py | 83 +++++++++++++++++++++++++++ datadog_lambda/metric.py | 111 ++++--------------------------------- datadog_lambda/wrapper.py | 2 - tests/test_metric.py | 6 +- tests/test_wrapper.py | 4 +- 6 files changed, 101 insertions(+), 108 deletions(-) create mode 100644 datadog_lambda/api.py diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 1ef94fb9..d92fb2a6 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -5,6 +5,9 @@ import os import logging +from datadog_lambda.api import init_api logger = logging.getLogger(__name__) logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) + +init_api() diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py new file mode 100644 index 00000000..d78192ca --- /dev/null +++ b/datadog_lambda/api.py @@ -0,0 +1,83 @@ +import os +import logging +import base64 + +logger = logging.getLogger(__name__) +KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName" + + +def decrypt_kms_api_key(kms_client, ciphertext): + from botocore.exceptions import ClientError + """ + Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS. + For this to work properly, the Lambda function must have the appropriate IAM permissions. + + Args: + kms_client: The KMS client to use for decryption + ciphertext (string): The base64-encoded ciphertext to decrypt + """ + decoded_bytes = base64.b64decode(ciphertext) + + """ + When the API key is encrypted using the AWS console, the function name is added as an + encryption context. When the API key is encrypted using the AWS CLI, no encryption context + is added. We need to try decrypting the API key both with and without the encryption context. + """ + # Try without encryption context, in case API key was encrypted using the AWS CLI + function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + try: + plaintext = kms_client.decrypt(CiphertextBlob=decoded_bytes)[ + "Plaintext" + ].decode("utf-8") + except ClientError: + logger.debug( + "Failed to decrypt ciphertext without encryption context, \ + retrying with encryption context" + ) + # Try with encryption context, in case API key was encrypted using the AWS Console + plaintext = kms_client.decrypt( + CiphertextBlob=decoded_bytes, + EncryptionContext={ + KMS_ENCRYPTION_CONTEXT_KEY: function_name, + }, + )["Plaintext"].decode("utf-8") + + return plaintext + + +def init_api(): + if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "false": + from datadog import api + + if not api._api_key: + import boto3 + + DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") + DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") + DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") + DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) + + if DD_API_KEY_SECRET_ARN: + api._api_key = boto3.client("secretsmanager").get_secret_value( + SecretId=DD_API_KEY_SECRET_ARN + )["SecretString"] + elif DD_API_KEY_SSM_NAME: + api._api_key = boto3.client("ssm").get_parameter( + Name=DD_API_KEY_SSM_NAME, WithDecryption=True + )["Parameter"]["Value"] + elif DD_KMS_API_KEY: + kms_client = boto3.client("kms") + api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) + else: + api._api_key = DD_API_KEY + + logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) + + # Set DATADOG_HOST, to send data to a non-default Datadog datacenter + api._api_host = os.environ.get( + "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") + ) + logger.debug("Setting DATADOG_HOST to %s", api._api_host) + + # Unmute exceptions from datadog api client, so we can catch and handle them + api._mute = False diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 2a04f13b..94f6142f 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -6,36 +6,25 @@ import os import json import time -import base64 import logging -from botocore.exceptions import ClientError from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer -KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName" -ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced" - logger = logging.getLogger(__name__) lambda_stats = None - - -def init_lambda_stats(): - global lambda_stats - if should_use_extension: - from datadog_lambda.statsd_writer import StatsDWriter - - lambda_stats = StatsDWriter() - else: - from datadog_lambda.thread_stats_writer import ThreadStatsWriter - - # Periodical flushing in a background thread is NOT guaranteed to succeed - # and leads to data loss. When disabled, metrics are only flushed at the - # end of invocation. To make metrics submitted from a long-running Lambda - # function available sooner, consider using the Datadog Lambda extension. - flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" - lambda_stats = ThreadStatsWriter(flush_in_thread) +if should_use_extension: + from datadog_lambda.statsd_writer import StatsDWriter + lambda_stats = StatsDWriter() +else: + # Periodical flushing in a background thread is NOT guaranteed to succeed + # and leads to data loss. When disabled, metrics are only flushed at the + # end of invocation. To make metrics submitted from a long-running Lambda + # function available sooner, consider using the Datadog Lambda extension. + from datadog_lambda.thread_stats_writer import ThreadStatsWriter + flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" + lambda_stats = ThreadStatsWriter(flush_in_thread) def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False): @@ -51,7 +40,6 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal periodically and at the end of the function execution in a background thread. """ - global lambda_stats flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" tags = tag_dd_lambda_layer(tags) @@ -80,7 +68,6 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): def flush_stats(): - global lambda_stats lambda_stats.flush() @@ -129,79 +116,3 @@ def submit_errors_metric(lambda_context): lambda_context (dict): Lambda context dict passed to the function by AWS """ submit_enhanced_metric("errors", lambda_context) - - -def decrypt_kms_api_key(kms_client, ciphertext): - """ - Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS. - For this to work properly, the Lambda function must have the appropriate IAM permissions. - - Args: - kms_client: The KMS client to use for decryption - ciphertext (string): The base64-encoded ciphertext to decrypt - """ - decoded_bytes = base64.b64decode(ciphertext) - - """ - When the API key is encrypted using the AWS console, the function name is added as an - encryption context. When the API key is encrypted using the AWS CLI, no encryption context - is added. We need to try decrypting the API key both with and without the encryption context. - """ - # Try without encryption context, in case API key was encrypted using the AWS CLI - function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") - try: - plaintext = kms_client.decrypt(CiphertextBlob=decoded_bytes)[ - "Plaintext" - ].decode("utf-8") - except ClientError: - logger.debug( - "Failed to decrypt ciphertext without encryption context, \ - retrying with encryption context" - ) - # Try with encryption context, in case API key was encrypted using the AWS Console - plaintext = kms_client.decrypt( - CiphertextBlob=decoded_bytes, - EncryptionContext={ - KMS_ENCRYPTION_CONTEXT_KEY: function_name, - }, - )["Plaintext"].decode("utf-8") - - return plaintext - - -# Set API Key only if extension is not here -if not should_use_extension: - from datadog import api - - if not api._api_key: - import boto3 - - DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") - DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") - DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") - DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) - - if DD_API_KEY_SECRET_ARN: - api._api_key = boto3.client("secretsmanager").get_secret_value( - SecretId=DD_API_KEY_SECRET_ARN - )["SecretString"] - elif DD_API_KEY_SSM_NAME: - api._api_key = boto3.client("ssm").get_parameter( - Name=DD_API_KEY_SSM_NAME, WithDecryption=True - )["Parameter"]["Value"] - elif DD_KMS_API_KEY: - kms_client = boto3.client("kms") - api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) - else: - api._api_key = DD_API_KEY - - logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) - - # Set DATADOG_HOST, to send data to a non-default Datadog datacenter - api._api_host = os.environ.get( - "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") - ) - logger.debug("Setting DATADOG_HOST to %s", api._api_host) - - # Unmute exceptions from datadog api client, so we can catch and handle them - api._mute = False diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 67c92e8c..533c28da 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -12,7 +12,6 @@ from datadog_lambda.cold_start import set_cold_start, is_cold_start from datadog_lambda.constants import XraySubsegment, TraceContextSource from datadog_lambda.metric import ( - init_lambda_stats, flush_stats, submit_invocations_metric, submit_errors_metric, @@ -120,7 +119,6 @@ def __init__(self, func): def __call__(self, event, context, **kwargs): """Executes when the wrapped function gets called""" - init_lambda_stats() self._before(event, context) try: self.response = self.func(event, context, **kwargs) diff --git a/tests/test_metric.py b/tests/test_metric.py index 2caf4a5f..21088651 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -8,10 +8,10 @@ from botocore.exceptions import ClientError as BotocoreClientError from datadog.api.exceptions import ClientError -from datadog_lambda.metric import ( +from datadog_lambda.metric import lambda_metric +from datadog_lambda.api import ( decrypt_kms_api_key, - lambda_metric, - KMS_ENCRYPTION_CONTEXT_KEY, + KMS_ENCRYPTION_CONTEXT_KEY ) from datadog_lambda.thread_stats_writer import ThreadStatsWriter from datadog_lambda.tags import _format_dd_lambda_layer_tag diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 5995cad7..f7190276 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -134,11 +134,10 @@ def lambda_handler(event, context): def test_datadog_lambda_wrapper_flush_in_thread(self): # force ThreadStats to flush in thread - os.environ["DD_FLUSH_IN_THREAD"] = "True" import datadog_lambda.metric as metric_module metric_module.lambda_stats.stop() - metric_module.init_lambda_stats() + metric_module.lambda_stats = ThreadStatsWriter(True) @datadog_lambda_wrapper def lambda_handler(event, context): @@ -159,7 +158,6 @@ def lambda_handler(event, context): # reset ThreadStats metric_module.lambda_stats.stop() metric_module.lambda_stats = ThreadStatsWriter(False) - del os.environ["DD_FLUSH_IN_THREAD"] def test_datadog_lambda_wrapper_not_flush_in_thread(self): # force ThreadStats to not flush in thread From 80375ecb50ecab665b7d4eab884a5c53358dec95 Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 16:08:12 -0400 Subject: [PATCH 05/11] pr review --- datadog_lambda/__init__.py | 3 --- datadog_lambda/api.py | 2 +- datadog_lambda/metric.py | 4 ++++ 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 8a31a53c..cf91dcc4 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -5,9 +5,6 @@ import os import logging -from datadog_lambda.api import init_api logger = logging.getLogger(__name__) logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) - -init_api() diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py index d78192ca..4c81d1aa 100644 --- a/datadog_lambda/api.py +++ b/datadog_lambda/api.py @@ -46,7 +46,7 @@ def decrypt_kms_api_key(kms_client, ciphertext): def init_api(): - if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "false": + if os.environ.get("DD_FLUSH_TO_LOG", "").lower() != "true": from datadog import api if not api._api_key: diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 94f6142f..92b7da67 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -10,10 +10,14 @@ from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer +from datadog_lambda.api import init_api logger = logging.getLogger(__name__) lambda_stats = None + +init_api() + if should_use_extension: from datadog_lambda.statsd_writer import StatsDWriter lambda_stats = StatsDWriter() From 40e60e0155ef127ddc7b713541ec01dc8823cd2d Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 16:13:21 -0400 Subject: [PATCH 06/11] lint --- datadog_lambda/api.py | 5 ++++- datadog_lambda/metric.py | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py index 4c81d1aa..b6b297e8 100644 --- a/datadog_lambda/api.py +++ b/datadog_lambda/api.py @@ -8,6 +8,7 @@ def decrypt_kms_api_key(kms_client, ciphertext): from botocore.exceptions import ClientError + """ Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS. For this to work properly, the Lambda function must have the appropriate IAM permissions. @@ -55,7 +56,9 @@ def init_api(): DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") - DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) + DD_API_KEY = os.environ.get( + "DD_API_KEY", os.environ.get("DATADOG_API_KEY", "") + ) if DD_API_KEY_SECRET_ARN: api._api_key = boto3.client("secretsmanager").get_secret_value( diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 92b7da67..0f9f1715 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -20,6 +20,7 @@ if should_use_extension: from datadog_lambda.statsd_writer import StatsDWriter + lambda_stats = StatsDWriter() else: # Periodical flushing in a background thread is NOT guaranteed to succeed @@ -27,6 +28,7 @@ # end of invocation. To make metrics submitted from a long-running Lambda # function available sooner, consider using the Datadog Lambda extension. from datadog_lambda.thread_stats_writer import ThreadStatsWriter + flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" lambda_stats = ThreadStatsWriter(flush_in_thread) From 245a1c66a625bb710def97c17043deddb2f9ccdd Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 16:14:33 -0400 Subject: [PATCH 07/11] lint --- datadog_lambda/metric.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 0f9f1715..1ae46564 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -28,7 +28,7 @@ # end of invocation. To make metrics submitted from a long-running Lambda # function available sooner, consider using the Datadog Lambda extension. from datadog_lambda.thread_stats_writer import ThreadStatsWriter - + flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" lambda_stats = ThreadStatsWriter(flush_in_thread) From 7ab79cb71d5c1aca25be78a5a9f39b9ad854b14f Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 16:17:12 -0400 Subject: [PATCH 08/11] lint --- tests/test_metric.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_metric.py b/tests/test_metric.py index 21088651..39288c76 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -9,10 +9,7 @@ from botocore.exceptions import ClientError as BotocoreClientError from datadog.api.exceptions import ClientError from datadog_lambda.metric import lambda_metric -from datadog_lambda.api import ( - decrypt_kms_api_key, - KMS_ENCRYPTION_CONTEXT_KEY -) +from datadog_lambda.api import decrypt_kms_api_key, KMS_ENCRYPTION_CONTEXT_KEY from datadog_lambda.thread_stats_writer import ThreadStatsWriter from datadog_lambda.tags import _format_dd_lambda_layer_tag From ec2b96ecedf46edb489b0fa4e7fae704407199a5 Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 16:33:47 -0400 Subject: [PATCH 09/11] fix condition --- datadog_lambda/api.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py index b6b297e8..167101ec 100644 --- a/datadog_lambda/api.py +++ b/datadog_lambda/api.py @@ -1,6 +1,7 @@ import os import logging import base64 +from datadog_lambda.extension import should_use_extension logger = logging.getLogger(__name__) KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName" @@ -47,7 +48,10 @@ def decrypt_kms_api_key(kms_client, ciphertext): def init_api(): - if os.environ.get("DD_FLUSH_TO_LOG", "").lower() != "true": + if ( + not should_use_extension + and os.environ.get("DD_FLUSH_TO_LOG", "").lower() != "true" + ): from datadog import api if not api._api_key: From fd79f8e8baa5a13203cf40592ffbc1438fc02302 Mon Sep 17 00:00:00 2001 From: maxday Date: Tue, 24 Aug 2021 16:37:23 -0400 Subject: [PATCH 10/11] better condition --- datadog_lambda/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py index 167101ec..a30b12d1 100644 --- a/datadog_lambda/api.py +++ b/datadog_lambda/api.py @@ -50,7 +50,7 @@ def decrypt_kms_api_key(kms_client, ciphertext): def init_api(): if ( not should_use_extension - and os.environ.get("DD_FLUSH_TO_LOG", "").lower() != "true" + and not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" ): from datadog import api From 01afee8d21a02d808c29d00abe7c296c7edd60bd Mon Sep 17 00:00:00 2001 From: maxday Date: Thu, 26 Aug 2021 09:37:40 -0400 Subject: [PATCH 11/11] Use extension should override DD_FLUSH_TO_LOGS --- datadog_lambda/metric.py | 17 +++++++++++++---- tests/test_metric.py | 13 +++++++++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 1ae46564..c3e39c97 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -45,15 +45,24 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal Otherwise, the metrics will be submitted to the Datadog API periodically and at the end of the function execution in a background thread. + + Note that if the extension is present, it will override the DD_FLUSH_TO_LOG value + and always use the layer to send metrics to the extension """ flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" tags = tag_dd_lambda_layer(tags) - if flush_to_logs or (force_async and not should_use_extension): - write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags) - else: - logger.debug("Sending metric %s to Datadog via lambda layer", metric_name) + if should_use_extension: lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp) + else: + if flush_to_logs or force_async: + write_metric_point_to_stdout( + metric_name, value, timestamp=timestamp, tags=tags + ) + else: + lambda_stats.distribution( + metric_name, value, tags=tags, timestamp=timestamp + ) def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): diff --git a/tests/test_metric.py b/tests/test_metric.py index 39288c76..8f896481 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -8,6 +8,8 @@ from botocore.exceptions import ClientError as BotocoreClientError from datadog.api.exceptions import ClientError + + from datadog_lambda.metric import lambda_metric from datadog_lambda.api import decrypt_kms_api_key, KMS_ENCRYPTION_CONTEXT_KEY from datadog_lambda.thread_stats_writer import ThreadStatsWriter @@ -33,6 +35,17 @@ def test_lambda_metric_tagged_with_dd_lambda_layer(self): ] ) + # let's fake that the extension is present, this should override DD_FLUSH_TO_LOG + @patch("datadog_lambda.metric.should_use_extension", True) + def test_lambda_metric_flush_to_log_with_extension(self): + os.environ["DD_FLUSH_TO_LOG"] = "True" + lambda_metric("test", 1) + expected_tag = _format_dd_lambda_layer_tag() + self.mock_metric_lambda_stats.distribution.assert_has_calls( + [call("test", 1, timestamp=None, tags=[expected_tag])] + ) + del os.environ["DD_FLUSH_TO_LOG"] + def test_lambda_metric_flush_to_log(self): os.environ["DD_FLUSH_TO_LOG"] = "True"