Skip to content

Refactoring to allow lazy loading of datadog.api #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions datadog_lambda/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import logging
import base64
from datadog_lambda.extension import should_use_extension

logger = logging.getLogger(__name__)
KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName"


def decrypt_kms_api_key(kms_client, ciphertext):
from botocore.exceptions import ClientError

"""
Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS.
For this to work properly, the Lambda function must have the appropriate IAM permissions.

Args:
kms_client: The KMS client to use for decryption
ciphertext (string): The base64-encoded ciphertext to decrypt
"""
decoded_bytes = base64.b64decode(ciphertext)

"""
When the API key is encrypted using the AWS console, the function name is added as an
encryption context. When the API key is encrypted using the AWS CLI, no encryption context
is added. We need to try decrypting the API key both with and without the encryption context.
"""
# Try without encryption context, in case API key was encrypted using the AWS CLI
function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME")
try:
plaintext = kms_client.decrypt(CiphertextBlob=decoded_bytes)[
"Plaintext"
].decode("utf-8")
except ClientError:
logger.debug(
"Failed to decrypt ciphertext without encryption context, \
retrying with encryption context"
)
# Try with encryption context, in case API key was encrypted using the AWS Console
plaintext = kms_client.decrypt(
CiphertextBlob=decoded_bytes,
EncryptionContext={
KMS_ENCRYPTION_CONTEXT_KEY: function_name,
},
)["Plaintext"].decode("utf-8")

return plaintext


def init_api():
if (
not should_use_extension
and not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
):
from datadog import api

if not api._api_key:
import boto3

DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "")
DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "")
DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "")
DD_API_KEY = os.environ.get(
"DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")
)

if DD_API_KEY_SECRET_ARN:
api._api_key = boto3.client("secretsmanager").get_secret_value(
SecretId=DD_API_KEY_SECRET_ARN
)["SecretString"]
elif DD_API_KEY_SSM_NAME:
api._api_key = boto3.client("ssm").get_parameter(
Name=DD_API_KEY_SSM_NAME, WithDecryption=True
)["Parameter"]["Value"]
elif DD_KMS_API_KEY:
kms_client = boto3.client("kms")
api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY)
else:
api._api_key = DD_API_KEY

logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key))

# Set DATADOG_HOST, to send data to a non-default Datadog datacenter
api._api_host = os.environ.get(
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com")
)
logger.debug("Setting DATADOG_HOST to %s", api._api_host)

# Unmute exceptions from datadog api client, so we can catch and handle them
api._mute = False
192 changes: 13 additions & 179 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,124 +6,31 @@
import os
import json
import time
import base64
import logging

from botocore.exceptions import ClientError
import boto3
from datadog import api, initialize, statsd
from datadog.threadstats import ThreadStats
from datadog_lambda.extension import should_use_extension
from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer

KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName"
ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"
from datadog_lambda.api import init_api

logger = logging.getLogger(__name__)

lambda_stats = None

init_api()

class StatsWriter:
def distribution(self, metric_name, value, tags=[], timestamp=None):
raise NotImplementedError()

def flush(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()


class StatsDWriter(StatsWriter):
"""
Writes distribution metrics using StatsD protocol
"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

def flush(self):
pass

def stop(self):
pass
if should_use_extension:
from datadog_lambda.statsd_writer import StatsDWriter

lambda_stats = StatsDWriter()
else:
# Periodical flushing in a background thread is NOT guaranteed to succeed
# and leads to data loss. When disabled, metrics are only flushed at the
# end of invocation. To make metrics submitted from a long-running Lambda
# function available sooner, consider using the Datadog Lambda extension.
from datadog_lambda.thread_stats_writer import ThreadStatsWriter

class ThreadStatsWriter(StatsWriter):
"""
Writes distribution metrics using the ThreadStats class
"""

def __init__(self, flush_in_thread):
self.thread_stats = ThreadStats(compress_payload=True)
self.thread_stats.start(flush_in_thread=flush_in_thread)

def distribution(self, metric_name, value, tags=[], timestamp=None):
self.thread_stats.distribution(
metric_name, value, tags=tags, timestamp=timestamp
)

def flush(self):
""" "Flush distributions from ThreadStats to Datadog.
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
to gain better control over exception handling.
"""
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
count_dists = len(dists)
if not count_dists:
logger.debug("No distributions to flush. Continuing.")

self.thread_stats.flush_count += 1
logger.debug(
"Flush #%s sending %s distributions",
self.thread_stats.flush_count,
count_dists,
)
try:
self.thread_stats.reporter.flush_distributions(dists)
except Exception as e:
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
# but comprehensive tests suggest that it is safe to retry on this specific error.
if isinstance(
e, api.exceptions.ClientError
) and "RemoteDisconnected" in str(e):
logger.debug(
"Retry flush #%s due to RemoteDisconnected",
self.thread_stats.flush_count,
)
try:
self.thread_stats.reporter.flush_distributions(dists)
except Exception:
logger.debug(
"Flush #%s failed after retry",
self.thread_stats.flush_count,
exc_info=True,
)
else:
logger.debug(
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True
)

def stop(self):
self.thread_stats.stop()


def init_lambda_stats():
global lambda_stats
if should_use_extension:
lambda_stats = StatsDWriter()
else:
# Periodical flushing in a background thread is NOT guaranteed to succeed
# and leads to data loss. When disabled, metrics are only flushed at the
# end of invocation. To make metrics submitted from a long-running Lambda
# function available sooner, consider using the Datadog Lambda extension.
flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true"
lambda_stats = ThreadStatsWriter(flush_in_thread)
flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true"
lambda_stats = ThreadStatsWriter(flush_in_thread)


def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
Expand All @@ -139,7 +46,6 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
periodically and at the end of the function execution in a
background thread.
"""
global lambda_stats
flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
tags = tag_dd_lambda_layer(tags)

Expand Down Expand Up @@ -168,7 +74,6 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):


def flush_stats():
global lambda_stats
lambda_stats.flush()


Expand Down Expand Up @@ -217,74 +122,3 @@ def submit_errors_metric(lambda_context):
lambda_context (dict): Lambda context dict passed to the function by AWS
"""
submit_enhanced_metric("errors", lambda_context)


def decrypt_kms_api_key(kms_client, ciphertext):
"""
Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS.
For this to work properly, the Lambda function must have the appropriate IAM permissions.

Args:
kms_client: The KMS client to use for decryption
ciphertext (string): The base64-encoded ciphertext to decrypt
"""
decoded_bytes = base64.b64decode(ciphertext)

"""
When the API key is encrypted using the AWS console, the function name is added as an
encryption context. When the API key is encrypted using the AWS CLI, no encryption context
is added. We need to try decrypting the API key both with and without the encryption context.
"""
# Try without encryption context, in case API key was encrypted using the AWS CLI
function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME")
try:
plaintext = kms_client.decrypt(CiphertextBlob=decoded_bytes)[
"Plaintext"
].decode("utf-8")
except ClientError:
logger.debug(
"Failed to decrypt ciphertext without encryption context, \
retrying with encryption context"
)
# Try with encryption context, in case API key was encrypted using the AWS Console
plaintext = kms_client.decrypt(
CiphertextBlob=decoded_bytes,
EncryptionContext={
KMS_ENCRYPTION_CONTEXT_KEY: function_name,
},
)["Plaintext"].decode("utf-8")

return plaintext


# Set API Key
if not api._api_key:
DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "")
DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "")
DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "")
DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", ""))

if DD_API_KEY_SECRET_ARN:
api._api_key = boto3.client("secretsmanager").get_secret_value(
SecretId=DD_API_KEY_SECRET_ARN
)["SecretString"]
elif DD_API_KEY_SSM_NAME:
api._api_key = boto3.client("ssm").get_parameter(
Name=DD_API_KEY_SSM_NAME, WithDecryption=True
)["Parameter"]["Value"]
elif DD_KMS_API_KEY:
kms_client = boto3.client("kms")
api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY)
else:
api._api_key = DD_API_KEY

logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key))

# Set DATADOG_HOST, to send data to a non-default Datadog datacenter
api._api_host = os.environ.get(
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com")
)
logger.debug("Setting DATADOG_HOST to %s", api._api_host)

# Unmute exceptions from datadog api client, so we can catch and handle them
api._mute = False
9 changes: 9 additions & 0 deletions datadog_lambda/stats_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class StatsWriter:
def distribution(self, metric_name, value, tags=[], timestamp=None):
raise NotImplementedError()

def flush(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()
21 changes: 21 additions & 0 deletions datadog_lambda/statsd_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from datadog_lambda.stats_writer import StatsWriter
from datadog import initialize, statsd


class StatsDWriter(StatsWriter):
"""
Writes distribution metrics using StatsD protocol
"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

def flush(self):
pass

def stop(self):
pass
Loading