From 113576a9cada55efe0bf0672c287ad8348a96c56 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 2 Feb 2023 12:01:55 -0500 Subject: [PATCH 01/13] (wip) coldstart tracing POC checkin --- datadog_lambda/__init__.py | 5 + datadog_lambda/cold_start_tracing.py | 122 ++++++++++++++++++ datadog_lambda/module.py | 182 +++++++++++++++++++++++++++ datadog_lambda/tracing.py | 8 ++ datadog_lambda/wrapper.py | 11 +- 5 files changed, 327 insertions(+), 1 deletion(-) create mode 100644 datadog_lambda/cold_start_tracing.py create mode 100644 datadog_lambda/module.py diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index cbec8f4f..96458a3a 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -6,6 +6,11 @@ import importlib_metadata __version__ = importlib_metadata.version(__name__) +import sys +print(f"__INIT__BEFORE_INSTALL {sys.meta_path}") +from datadog_lambda.module import ModuleWatchdog +ModuleWatchdog.install() +print(f"__INIT__AFTER_INSTALL {sys.meta_path}") import os import logging diff --git a/datadog_lambda/cold_start_tracing.py b/datadog_lambda/cold_start_tracing.py new file mode 100644 index 00000000..395f16f7 --- /dev/null +++ b/datadog_lambda/cold_start_tracing.py @@ -0,0 +1,122 @@ + +import os +import time +from typing import List +class ImportNode(object): + def __init__(self, name, origin, start_time_ns, end_time_ns= None): + self.module_name = name + self.file_name = origin + self.start_time_ns = start_time_ns + self.end_time_ns = end_time_ns + self.children = [] + +root_nodes = [] + +import_stack = [] + +total = 0 + +n_spans = 0 + +skips = 0 + +def push_node(module_spec): + global total + total += 1 + global root_nodes + node = ImportNode(module_spec.name, module_spec.origin, time.time_ns()) + global import_stack + # print(f'Pushing node for {module_spec.name},{len(import_stack)} on stack, {len(root_nodes)} roots') + if import_stack: + import_stack[-1].children.append(node) + import_stack.append(node) + +def pop_node(fullname): + global root_nodes + end_time_ns = time.time_ns() + global import_stack + node = import_stack.pop() + if node: + node.end_time_ns = end_time_ns + if not import_stack: # import_stack empty, a root node has been found + root_nodes.append(node) + # print(f'Poping node {len(import_stack)} left, {len(root_nodes)} roots') + + +class ColdStartTracer(object): + + def __init__(self, tracer, parent_span, cold_start_span_finish_time_ns, trace_ctx, min_duration = 3): + self._tracer = tracer + self.functio_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + self.parent_span = parent_span + self.cold_start_span_finish_time_ns = cold_start_span_finish_time_ns + self.min_duration = min_duration + self.trace_ctx = trace_ctx + + def trace(self, root_nodes: List[ImportNode]): + cold_start_span_start_time_ns = root_nodes[0].start_time_ns + cold_start_span = self.create_cold_start_span(cold_start_span_start_time_ns) + for import_node in root_nodes: + self.trace_tree(import_node, cold_start_span) + + def trace_tree(self, import_node: ImportNode, parent_span): + if import_node.end_time_ns - import_node.start_time_ns < self.min_duration: + global skips + skips += 1 + return + span_kwargs = { + "service": "aws.lambda", + "resource": import_node.module_name, + "span_type": "aws.lambda.import", + } + span = self._tracer.trace("aws.lambda.import", **span_kwargs) + global n_spans + n_spans += 1 + + tags = { + "resource_names": import_node.module_name, + "resource.name": import_node.module_name, + "filename": import_node.file_name, + "operation_name": self.get_operation_name(import_node.file_name) + } + span.set_tags(tags) + if parent_span: + span.parent_id = parent_span.span_id + span.start_ns = import_node.start_time_ns + self.finish_ns(span, import_node.end_time_ns) + for child_node in import_node.children: + self.trace_tree(child_node, span) + + + def create_cold_start_span(self, start_time_ns): + span_kwargs = { + "service": "aws.lambda", + "resource": self.functio_name, + "span_type": "aws.lambda.import", + } + span = self._tracer.trace("aws.lambda.import", **span_kwargs) + # tags = { + + # } + # span.set_tags(tags) + self._tracer.context_provider.activate(self.trace_ctx) # because it was reset by finish in wrapper + # trace_ctx = self._tracer.current_trace_context() + # print(f"SELF.TRACE_CONTEXT {self.trace_ctx} Trace_ctx: {trace_ctx}") + span.start_ns = start_time_ns + self.finish_ns(span, self.cold_start_span_finish_time_ns) + return span + + def finish_ns(self, span, finish_time_ns): + span.finish(finish_time_ns / 1e9) + self._tracer.context_provider.activate(self.trace_ctx) # reactivate required after each finish + + def get_operation_name(self, filename: str): + if filename.startswith("/opt/"): + return "aws.lambda.import_layer" + elif filename.startswith("/var/runtime/"): + return "aws.lambda.import_runtime" + elif '/' in filename: + return "aws.lambda.import" + else: + return "aws.lambda.import_core_module" + diff --git a/datadog_lambda/module.py b/datadog_lambda/module.py new file mode 100644 index 00000000..4a593f15 --- /dev/null +++ b/datadog_lambda/module.py @@ -0,0 +1,182 @@ +import sys +from os.path import abspath +from os.path import isfile +from types import ModuleType +from typing import Optional +from typing import Set +from typing import Union + +# Borrowed from the wrapt module +# https://github.com/GrahamDumpleton/wrapt/blob/df0e62c2740143cceb6cafea4c306dae1c559ef8/src/wrapt/importer.py + +# if PY2 panic (sys.version_info < (3, 6)) else: +from importlib.abc import Loader +from importlib.machinery import ModuleSpec +from importlib.util import find_spec +from datadog_lambda.cold_start_tracing import push_node, pop_node + + +def origin(module): + # type: (ModuleType) -> str + """Get the origin source file of the module.""" + try: + # DEV: Use object.__getattribute__ to avoid potential side-effects. + orig = abspath(object.__getattribute__(module, "__file__")) + except (AttributeError, TypeError): + # Module is probably only partially initialised, so we look at its + # spec instead + try: + # DEV: Use object.__getattribute__ to avoid potential side-effects. + orig = abspath(object.__getattribute__(module, "__spec__").origin) + except (AttributeError, ValueError, TypeError): + orig = None + + if orig is not None and isfile(orig): + if orig.endswith(".pyc"): + orig = orig[:-1] + return orig + + return "" + + + +class _ImportHookChainedLoader(Loader): + def __init__(self, loader): + # type: (Loader) -> None + self.loader = loader + + # # DEV: load_module is deprecated so we define it at runtime if also + # # defined by the default loader. We also check and define for the + # # methods that are supposed to replace the load_module functionality. + # if hasattr(loader, "load_module"): + # self.load_module = self._load_module # type: ignore[assignment] + if hasattr(loader, "create_module"): + self.create_module = self._create_module # type: ignore[assignment] + if hasattr(loader, "exec_module"): + self.exec_module = self._exec_module # type: ignore[assignment] + + def __getattribute__(self, name): + if name == "__class__": + # Make isinstance believe that self is also an instance of + # type(self.loader). This is required, e.g. by some tools, like + # slotscheck, that can handle known loaders only. + return self.loader.__class__ + + return super(_ImportHookChainedLoader, self).__getattribute__(name) + + def __getattr__(self, name): + # Proxy any other attribute access to the underlying loader. + return getattr(self.loader, name) + + def _create_module(self, spec): + # print(f"[CST] Create module for spec {spec}") + push_node(spec) + return self.loader.create_module(spec) + + def _exec_module(self, module): + # print(f"[CST] Exec module for spec {module}") + self.loader.exec_module(module) + pop_node(module) + + +class ModuleWatchdog(object): + + _instance = None # type: Optional[ModuleWatchdog] + + def __init__(self): + self._finding = set() # type: Set[str] + + def __repr__(self) -> str: + return "ModuleWatchdog" + + def _add_to_meta_path(self): + # type: () -> None + sys.meta_path.insert(0, self) # type: ignore[arg-type] + + @classmethod + def _find_in_meta_path(cls): + # type: () -> Optional[int] + for i, meta_path in enumerate(sys.meta_path): + if type(meta_path) is cls: + return i + return None + + @classmethod + def _remove_from_meta_path(cls): + # type: () -> None + i = cls._find_in_meta_path() + if i is not None: + sys.meta_path.pop(i) + + def find_module(self, fullname, path=None): + # type: (str, Optional[str]) -> Union[ModuleWatchdog, _ImportHookChainedLoader, None] + if fullname in self._finding: + return None + + self._finding.add(fullname) + # print(f"[CST] finding module for {fullname}") + try: + loader = getattr(find_spec(fullname), "loader", None) + if loader is not None: + if not isinstance(loader, _ImportHookChainedLoader): + loader = _ImportHookChainedLoader(loader) + + return loader + finally: + self._finding.remove(fullname) + + return None + + def find_spec(self, fullname, path=None, target=None): + # type: (str, Optional[str], Optional[ModuleType]) -> Optional[ModuleSpec] + if fullname in self._finding: + return None + + self._finding.add(fullname) + # print(f"[CST] finding spec for {fullname} ") + try: + spec = find_spec(fullname) + if spec is None: + return None + loader = getattr(spec, "loader", None) + + if loader is not None: + if not isinstance(loader, _ImportHookChainedLoader): + spec.loader = _ImportHookChainedLoader(loader) + # push_node(spec) + # cast(_ImportHookChainedLoader, spec.loader).add_callback(type(self), self.after_import) + + return spec + + finally: + self._finding.remove(fullname) + + + @classmethod + def _check_installed(cls): + # type: () -> None + if not cls.is_installed(): + raise RuntimeError("%s is not installed" % cls.__name__) + + @classmethod + def install(cls): + # type: () -> None + """Install the module watchdog.""" + if cls.is_installed(): + raise RuntimeError("%s is already installed" % cls.__name__) + this = cls() + cls._instance = this + this._add_to_meta_path() + + @classmethod + def is_installed(cls): + """Check whether this module watchdog class is installed.""" + return cls._instance is not None and type(cls._instance) is cls + + @classmethod + def uninstall(cls): + cls._check_installed() + cls._remove_from_meta_path() + cls._instance = None + + diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 19ef8c04..0470a151 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -1163,6 +1163,14 @@ def mark_trace_as_error_for_5xx_responses(context, status_code, span): span.error = 1 +from datadog_lambda.cold_start_tracing import ColdStartTracer, root_nodes, skips, total, n_spans + +def trace_cold_start(span, span_or_inferred_span, trace_ctx): + cold_start_tracer = ColdStartTracer(tracer, span_or_inferred_span, span.start_ns, trace_ctx) + print(f"[CST] ROOT NODES LEN: {len(root_nodes)}") + cold_start_tracer.trace(root_nodes) + print(f"Total {total}, {n_spans} SPANS, {skips} skips") + class InferredSpanInfo(object): BASE_NAME = "_inferred_span" SYNCHRONICITY = f"{BASE_NAME}.synchronicity" diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 51ffccec..cbc995b3 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -2,7 +2,7 @@ # under the Apache License Version 2.0. # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2019 Datadog, Inc. - +print("BEFORE WRAPPER BASE64 Import") import base64 import os import logging @@ -38,6 +38,8 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, + trace_cold_start, + tracer, ) from datadog_lambda.trigger import ( extract_trigger_tags, @@ -109,6 +111,7 @@ def __new__(cls, func): def __init__(self, func): """Executes when the wrapped function gets wrapped""" try: + # patch_import() self.func = func self.flush_to_log = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" self.logs_injection = ( @@ -258,6 +261,8 @@ def _after(self, event, context): self.trigger_tags, XraySubsegment.LAMBDA_FUNCTION_TAGS_KEY ) + trace_ctx = tracer.current_trace_context() + print(f"PROVIDING_TRACE_CONTEXT {trace_ctx}") if self.span: if dd_capture_lambda_payload_enabled: tag_object(self.span, "function.request", event) @@ -286,6 +291,10 @@ def _after(self, event, context): event.get("requestContext", {}).get("requestId") ) logger.debug("datadog_lambda_wrapper _after() done") + + span = self.span or self.inferred_span + trace_cold_start(self.span, span, trace_ctx) + except Exception: traceback.print_exc() From 4250a9cbd78ee8e1ee3a86def47c1b9f226dc09d Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Mon, 6 Feb 2023 14:03:55 -0500 Subject: [PATCH 02/13] feat: rename cold start span so it is nested properly. --- datadog_lambda/cold_start_tracing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datadog_lambda/cold_start_tracing.py b/datadog_lambda/cold_start_tracing.py index 395f16f7..cf884fe8 100644 --- a/datadog_lambda/cold_start_tracing.py +++ b/datadog_lambda/cold_start_tracing.py @@ -47,7 +47,7 @@ class ColdStartTracer(object): def __init__(self, tracer, parent_span, cold_start_span_finish_time_ns, trace_ctx, min_duration = 3): self._tracer = tracer - self.functio_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + self.function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") self.parent_span = parent_span self.cold_start_span_finish_time_ns = cold_start_span_finish_time_ns self.min_duration = min_duration @@ -91,10 +91,10 @@ def trace_tree(self, import_node: ImportNode, parent_span): def create_cold_start_span(self, start_time_ns): span_kwargs = { "service": "aws.lambda", - "resource": self.functio_name, - "span_type": "aws.lambda.import", + "resource": self.function_name, + "span_type": "aws.lambda.load", } - span = self._tracer.trace("aws.lambda.import", **span_kwargs) + span = self._tracer.trace("aws.lambda.load", **span_kwargs) # tags = { # } From 202f1cc479f4969638bb25198227309f0e0c71d6 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 9 Feb 2023 22:03:54 -0500 Subject: [PATCH 03/13] simplify how to wrap methods with push_node and pop_node and bug fixes --- datadog_lambda/__init__.py | 24 +++-- datadog_lambda/cold_start.py | 180 ++++++++++++++++++++++++++++++++++ datadog_lambda/module.py | 182 ----------------------------------- datadog_lambda/tracing.py | 8 -- datadog_lambda/wrapper.py | 47 +++++++-- 5 files changed, 237 insertions(+), 204 deletions(-) delete mode 100644 datadog_lambda/module.py diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 96458a3a..6b9d21fe 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -1,3 +1,21 @@ +from datadog_lambda.cold_start import is_cold_start, wrap_find_spec + +if is_cold_start(): + import os + + if ( + os.environ.get("DD_TRACE_ENABLED", "true").lower() == "true" + and os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true" + ): + from sys import version_info, meta_path + + if version_info >= (3, 7): # current implementation only support version > 3.7 + for importer in meta_path: + try: + importer.find_spec = wrap_find_spec(importer.find_spec) + except: + pass + # The minor version corresponds to the Lambda layer version. # E.g.,, version 0.5.0 gets packaged into layer version 5. try: @@ -6,13 +24,7 @@ import importlib_metadata __version__ = importlib_metadata.version(__name__) -import sys -print(f"__INIT__BEFORE_INSTALL {sys.meta_path}") -from datadog_lambda.module import ModuleWatchdog -ModuleWatchdog.install() -print(f"__INIT__AFTER_INSTALL {sys.meta_path}") -import os import logging logger = logging.getLogger(__name__) diff --git a/datadog_lambda/cold_start.py b/datadog_lambda/cold_start.py index c8862bf1..1c4aad08 100644 --- a/datadog_lambda/cold_start.py +++ b/datadog_lambda/cold_start.py @@ -1,5 +1,12 @@ +import time +import os +from importlib.abc import Loader +from typing import List + _cold_start = True _lambda_container_initialized = False +root_nodes = [] +import_stack = [] def set_cold_start(): @@ -21,3 +28,176 @@ def is_cold_start(): def get_cold_start_tag(): """Returns the cold start tag to be used in metrics""" return "cold_start:{}".format(str(is_cold_start()).lower()) + + +class ImportNode(object): + def __init__(self, module_name, full_file_path, start_time_ns, end_time_ns=None): + self.module_name = module_name + self.full_file_path = full_file_path + self.start_time_ns = start_time_ns + self.end_time_ns = end_time_ns + self.children = [] + + +def push_node(module_name, file_path): + node = ImportNode(module_name, file_path, time.time_ns()) + if import_stack: + import_stack[-1].children.append(node) + import_stack.append(node) + + +def pop_node(module_name): + if not import_stack: + return + node = import_stack.pop() + if node.module_name != module_name: + return + end_time_ns = time.time_ns() + node.end_time_ns = end_time_ns + if not import_stack: # import_stack empty, a root node has been found + root_nodes.append(node) + + +def wrap_exec_module(original_exec_module): + def wrapped_method(module): + should_pop = False + spec = module.__spec__ + try: + push_node(spec.name, spec.origin) + should_pop = True + except: + pass + try: + return original_exec_module(module) + finally: + if should_pop: + pop_node(spec.name) + + return wrapped_method + + +def wrap_load_module(original_load_module): + def wrapped_method(fullname): + should_pop = False + try: + push_node(fullname, fullname) + should_pop = True + except: + pass + try: + return original_load_module(fullname) + finally: + if should_pop: + pop_node(fullname) + + return wrapped_method + + +def wrap_find_spec(original_find_spec): + def wrapped_find_spec(*args, **kwargs): + spec = original_find_spec(*args, **kwargs) + if spec is None: + return None + loader = getattr(spec, "loader", None) + if loader is not None: + if hasattr(loader, "exec_module") and hasattr(loader, "create_module"): + loader.exec_module = wrap_exec_module(loader.exec_module) + if hasattr(loader, "load_module"): # legacy support + loader.load_module = wrap_load_module(loader.load_module) + return spec + + return wrapped_find_spec + + +class ColdStartTracer(object): + def __init__( + self, + tracer, + function_name, + cold_start_span_finish_time_ns, + trace_ctx, + min_duration_ms: int, + ignored_libs: List[str] = [], + ): + self._tracer = tracer + self.function_name = function_name + self.cold_start_span_finish_time_ns = cold_start_span_finish_time_ns + self.min_duration_ms = min_duration_ms + self.trace_ctx = trace_ctx + self.ignored_libs = ignored_libs + self.need_to_reactivate_context = True + + def trace(self, root_nodes: List[ImportNode] = root_nodes): + if not root_nodes: + return + cold_start_span_start_time_ns = root_nodes[0].start_time_ns + cold_start_span = self.create_cold_start_span(cold_start_span_start_time_ns) + while root_nodes: + root_node = root_nodes.pop() + self.trace_tree(root_node, cold_start_span) + self.finish_span(cold_start_span, self.cold_start_span_finish_time_ns) + + def trace_tree(self, import_node: ImportNode, parent_span): + if ( + import_node.end_time_ns - import_node.start_time_ns + < self.min_duration_ms * 1e6 + or import_node.module_name in self.ignored_libs + ): + return + + span = self.start_span( + "aws.lambda.import", import_node.module_name, import_node.start_time_ns + ) + tags = { + "resource_names": import_node.module_name, + "resource.name": import_node.module_name, + "filename": import_node.full_file_path, + "operation_name": self.get_operation_name(import_node.full_file_path), + } + span.set_tags(tags) + if parent_span: + span.parent_id = parent_span.span_id + for child_node in import_node.children: + self.trace_tree(child_node, span) + self.finish_span(span, import_node.end_time_ns) + + def create_cold_start_span(self, start_time_ns): + span = self.start_span("aws.lambda.load", self.function_name, start_time_ns) + tags = { + "resource_names": self.function_name, + "resource.name": self.function_name, + "operation_name": "aws.lambda.load", + } + span.set_tags(tags) + return span + + def start_span(self, span_type, resource, start_time_ns): + if self.need_to_reactivate_context: + self._tracer.context_provider.activate( + self.trace_ctx + ) # reactivate required after each finish() call + self.need_to_reactivate_context = False + span_kwargs = { + "service": "aws.lambda", + "resource": resource, + "span_type": span_type, + } + span = self._tracer.trace(span_type, **span_kwargs) + span.start_ns = start_time_ns + return span + + def finish_span(self, span, finish_time_ns): + span.finish(finish_time_ns / 1e9) + self.need_to_reactivate_context = True + + def get_operation_name(self, filename: str): + if filename is None: + return "aws.lambda.import_core_module" + if not isinstance(filename, str): + return "aws.lambda.import" + if filename.startswith("/opt/"): + return "aws.lambda.import_layer" + elif filename.startswith("/var/lang/"): + return "aws.lambda.import_runtime" + else: + return "aws.lambda.import" diff --git a/datadog_lambda/module.py b/datadog_lambda/module.py deleted file mode 100644 index 4a593f15..00000000 --- a/datadog_lambda/module.py +++ /dev/null @@ -1,182 +0,0 @@ -import sys -from os.path import abspath -from os.path import isfile -from types import ModuleType -from typing import Optional -from typing import Set -from typing import Union - -# Borrowed from the wrapt module -# https://github.com/GrahamDumpleton/wrapt/blob/df0e62c2740143cceb6cafea4c306dae1c559ef8/src/wrapt/importer.py - -# if PY2 panic (sys.version_info < (3, 6)) else: -from importlib.abc import Loader -from importlib.machinery import ModuleSpec -from importlib.util import find_spec -from datadog_lambda.cold_start_tracing import push_node, pop_node - - -def origin(module): - # type: (ModuleType) -> str - """Get the origin source file of the module.""" - try: - # DEV: Use object.__getattribute__ to avoid potential side-effects. - orig = abspath(object.__getattribute__(module, "__file__")) - except (AttributeError, TypeError): - # Module is probably only partially initialised, so we look at its - # spec instead - try: - # DEV: Use object.__getattribute__ to avoid potential side-effects. - orig = abspath(object.__getattribute__(module, "__spec__").origin) - except (AttributeError, ValueError, TypeError): - orig = None - - if orig is not None and isfile(orig): - if orig.endswith(".pyc"): - orig = orig[:-1] - return orig - - return "" - - - -class _ImportHookChainedLoader(Loader): - def __init__(self, loader): - # type: (Loader) -> None - self.loader = loader - - # # DEV: load_module is deprecated so we define it at runtime if also - # # defined by the default loader. We also check and define for the - # # methods that are supposed to replace the load_module functionality. - # if hasattr(loader, "load_module"): - # self.load_module = self._load_module # type: ignore[assignment] - if hasattr(loader, "create_module"): - self.create_module = self._create_module # type: ignore[assignment] - if hasattr(loader, "exec_module"): - self.exec_module = self._exec_module # type: ignore[assignment] - - def __getattribute__(self, name): - if name == "__class__": - # Make isinstance believe that self is also an instance of - # type(self.loader). This is required, e.g. by some tools, like - # slotscheck, that can handle known loaders only. - return self.loader.__class__ - - return super(_ImportHookChainedLoader, self).__getattribute__(name) - - def __getattr__(self, name): - # Proxy any other attribute access to the underlying loader. - return getattr(self.loader, name) - - def _create_module(self, spec): - # print(f"[CST] Create module for spec {spec}") - push_node(spec) - return self.loader.create_module(spec) - - def _exec_module(self, module): - # print(f"[CST] Exec module for spec {module}") - self.loader.exec_module(module) - pop_node(module) - - -class ModuleWatchdog(object): - - _instance = None # type: Optional[ModuleWatchdog] - - def __init__(self): - self._finding = set() # type: Set[str] - - def __repr__(self) -> str: - return "ModuleWatchdog" - - def _add_to_meta_path(self): - # type: () -> None - sys.meta_path.insert(0, self) # type: ignore[arg-type] - - @classmethod - def _find_in_meta_path(cls): - # type: () -> Optional[int] - for i, meta_path in enumerate(sys.meta_path): - if type(meta_path) is cls: - return i - return None - - @classmethod - def _remove_from_meta_path(cls): - # type: () -> None - i = cls._find_in_meta_path() - if i is not None: - sys.meta_path.pop(i) - - def find_module(self, fullname, path=None): - # type: (str, Optional[str]) -> Union[ModuleWatchdog, _ImportHookChainedLoader, None] - if fullname in self._finding: - return None - - self._finding.add(fullname) - # print(f"[CST] finding module for {fullname}") - try: - loader = getattr(find_spec(fullname), "loader", None) - if loader is not None: - if not isinstance(loader, _ImportHookChainedLoader): - loader = _ImportHookChainedLoader(loader) - - return loader - finally: - self._finding.remove(fullname) - - return None - - def find_spec(self, fullname, path=None, target=None): - # type: (str, Optional[str], Optional[ModuleType]) -> Optional[ModuleSpec] - if fullname in self._finding: - return None - - self._finding.add(fullname) - # print(f"[CST] finding spec for {fullname} ") - try: - spec = find_spec(fullname) - if spec is None: - return None - loader = getattr(spec, "loader", None) - - if loader is not None: - if not isinstance(loader, _ImportHookChainedLoader): - spec.loader = _ImportHookChainedLoader(loader) - # push_node(spec) - # cast(_ImportHookChainedLoader, spec.loader).add_callback(type(self), self.after_import) - - return spec - - finally: - self._finding.remove(fullname) - - - @classmethod - def _check_installed(cls): - # type: () -> None - if not cls.is_installed(): - raise RuntimeError("%s is not installed" % cls.__name__) - - @classmethod - def install(cls): - # type: () -> None - """Install the module watchdog.""" - if cls.is_installed(): - raise RuntimeError("%s is already installed" % cls.__name__) - this = cls() - cls._instance = this - this._add_to_meta_path() - - @classmethod - def is_installed(cls): - """Check whether this module watchdog class is installed.""" - return cls._instance is not None and type(cls._instance) is cls - - @classmethod - def uninstall(cls): - cls._check_installed() - cls._remove_from_meta_path() - cls._instance = None - - diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 0470a151..19ef8c04 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -1163,14 +1163,6 @@ def mark_trace_as_error_for_5xx_responses(context, status_code, span): span.error = 1 -from datadog_lambda.cold_start_tracing import ColdStartTracer, root_nodes, skips, total, n_spans - -def trace_cold_start(span, span_or_inferred_span, trace_ctx): - cold_start_tracer = ColdStartTracer(tracer, span_or_inferred_span, span.start_ns, trace_ctx) - print(f"[CST] ROOT NODES LEN: {len(root_nodes)}") - cold_start_tracer.trace(root_nodes) - print(f"Total {total}, {n_spans} SPANS, {skips} skips") - class InferredSpanInfo(object): BASE_NAME = "_inferred_span" SYNCHRONICITY = f"{BASE_NAME}.synchronicity" diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index cbc995b3..15e7a4c0 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -2,7 +2,6 @@ # under the Apache License Version 2.0. # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2019 Datadog, Inc. -print("BEFORE WRAPPER BASE64 Import") import base64 import os import logging @@ -12,7 +11,7 @@ from time import time_ns from datadog_lambda.extension import should_use_extension, flush_extension -from datadog_lambda.cold_start import set_cold_start, is_cold_start +from datadog_lambda.cold_start import set_cold_start, is_cold_start, ColdStartTracer from datadog_lambda.constants import ( TraceContextSource, XraySubsegment, @@ -38,7 +37,6 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, - trace_cold_start, tracer, ) from datadog_lambda.trigger import ( @@ -134,6 +132,25 @@ def __init__(self, func): self.decode_authorizer_context = ( os.environ.get("DD_DECODE_AUTHORIZER_CONTEXT", "true").lower() == "true" ) + self.cold_start_tracing = ( + os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true" + ) + self.min_cold_start_trace_duration = 3 + if "DD_MIN_COLD_START_DURATION" in os.environ: + try: + self.min_cold_start_trace_duration = int( + os.environ["DD_MIN_COLD_START_DURATION"] + ) + except: + logger.debug("Malformatted env DD_MIN_COLD_START_DURATION") + self.cold_start_trace_skip_lib = [] + if "DD_COLD_START_TRACE_SKIP_LIB" in os.environ: + try: + self.cold_start_trace_skip_lib = os.environ[ + "DD_COLD_START_TRACE_SKIP_LIB" + ].split(",") + except: + logger.debug("Malformatted for env DD_COLD_START_TRACE_SKIP_LIB") self.response = None if profiling_env_var: self.prof = profiler.Profiler(env=env_env_var, service=service_env_var) @@ -260,9 +277,12 @@ def _after(self, event, context): create_dd_dummy_metadata_subsegment( self.trigger_tags, XraySubsegment.LAMBDA_FUNCTION_TAGS_KEY ) + should_trace_cold_start = ( + dd_tracing_enabled and self.cold_start_tracing and is_cold_start() + ) + if should_trace_cold_start: + trace_ctx = tracer.current_trace_context() - trace_ctx = tracer.current_trace_context() - print(f"PROVIDING_TRACE_CONTEXT {trace_ctx}") if self.span: if dd_capture_lambda_payload_enabled: tag_object(self.span, "function.request", event) @@ -281,6 +301,20 @@ def _after(self, event, context): else: self.inferred_span.finish() + if should_trace_cold_start: + try: + following_span = self.span or self.inferred_span + ColdStartTracer( + tracer, + self.function_name, + following_span.start_ns, + trace_ctx, + self.min_cold_start_trace_duration, + self.cold_start_trace_skip_lib, + ).trace() + except Exception as e: + logger.debug("Failed to create cold start spans. %s", e) + if not self.flush_to_log or should_use_extension: flush_stats() if should_use_extension: @@ -292,9 +326,6 @@ def _after(self, event, context): ) logger.debug("datadog_lambda_wrapper _after() done") - span = self.span or self.inferred_span - trace_cold_start(self.span, span, trace_ctx) - except Exception: traceback.print_exc() From 7b707c00cee9c4b0921b946f2ccc3604c3efa44e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 9 Feb 2023 22:07:03 -0500 Subject: [PATCH 04/13] remove cold_start_tracing file and use cold_start file --- datadog_lambda/cold_start_tracing.py | 122 --------------------------- 1 file changed, 122 deletions(-) delete mode 100644 datadog_lambda/cold_start_tracing.py diff --git a/datadog_lambda/cold_start_tracing.py b/datadog_lambda/cold_start_tracing.py deleted file mode 100644 index cf884fe8..00000000 --- a/datadog_lambda/cold_start_tracing.py +++ /dev/null @@ -1,122 +0,0 @@ - -import os -import time -from typing import List -class ImportNode(object): - def __init__(self, name, origin, start_time_ns, end_time_ns= None): - self.module_name = name - self.file_name = origin - self.start_time_ns = start_time_ns - self.end_time_ns = end_time_ns - self.children = [] - -root_nodes = [] - -import_stack = [] - -total = 0 - -n_spans = 0 - -skips = 0 - -def push_node(module_spec): - global total - total += 1 - global root_nodes - node = ImportNode(module_spec.name, module_spec.origin, time.time_ns()) - global import_stack - # print(f'Pushing node for {module_spec.name},{len(import_stack)} on stack, {len(root_nodes)} roots') - if import_stack: - import_stack[-1].children.append(node) - import_stack.append(node) - -def pop_node(fullname): - global root_nodes - end_time_ns = time.time_ns() - global import_stack - node = import_stack.pop() - if node: - node.end_time_ns = end_time_ns - if not import_stack: # import_stack empty, a root node has been found - root_nodes.append(node) - # print(f'Poping node {len(import_stack)} left, {len(root_nodes)} roots') - - -class ColdStartTracer(object): - - def __init__(self, tracer, parent_span, cold_start_span_finish_time_ns, trace_ctx, min_duration = 3): - self._tracer = tracer - self.function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") - self.parent_span = parent_span - self.cold_start_span_finish_time_ns = cold_start_span_finish_time_ns - self.min_duration = min_duration - self.trace_ctx = trace_ctx - - def trace(self, root_nodes: List[ImportNode]): - cold_start_span_start_time_ns = root_nodes[0].start_time_ns - cold_start_span = self.create_cold_start_span(cold_start_span_start_time_ns) - for import_node in root_nodes: - self.trace_tree(import_node, cold_start_span) - - def trace_tree(self, import_node: ImportNode, parent_span): - if import_node.end_time_ns - import_node.start_time_ns < self.min_duration: - global skips - skips += 1 - return - span_kwargs = { - "service": "aws.lambda", - "resource": import_node.module_name, - "span_type": "aws.lambda.import", - } - span = self._tracer.trace("aws.lambda.import", **span_kwargs) - global n_spans - n_spans += 1 - - tags = { - "resource_names": import_node.module_name, - "resource.name": import_node.module_name, - "filename": import_node.file_name, - "operation_name": self.get_operation_name(import_node.file_name) - } - span.set_tags(tags) - if parent_span: - span.parent_id = parent_span.span_id - span.start_ns = import_node.start_time_ns - self.finish_ns(span, import_node.end_time_ns) - for child_node in import_node.children: - self.trace_tree(child_node, span) - - - def create_cold_start_span(self, start_time_ns): - span_kwargs = { - "service": "aws.lambda", - "resource": self.function_name, - "span_type": "aws.lambda.load", - } - span = self._tracer.trace("aws.lambda.load", **span_kwargs) - # tags = { - - # } - # span.set_tags(tags) - self._tracer.context_provider.activate(self.trace_ctx) # because it was reset by finish in wrapper - # trace_ctx = self._tracer.current_trace_context() - # print(f"SELF.TRACE_CONTEXT {self.trace_ctx} Trace_ctx: {trace_ctx}") - span.start_ns = start_time_ns - self.finish_ns(span, self.cold_start_span_finish_time_ns) - return span - - def finish_ns(self, span, finish_time_ns): - span.finish(finish_time_ns / 1e9) - self._tracer.context_provider.activate(self.trace_ctx) # reactivate required after each finish - - def get_operation_name(self, filename: str): - if filename.startswith("/opt/"): - return "aws.lambda.import_layer" - elif filename.startswith("/var/runtime/"): - return "aws.lambda.import_runtime" - elif '/' in filename: - return "aws.lambda.import" - else: - return "aws.lambda.import_core_module" - From 650b99dfd45ecc4ef495397f359f8866e1cd83bf Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 9 Feb 2023 22:09:44 -0500 Subject: [PATCH 05/13] bugfix --- datadog_lambda/__init__.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 6b9d21fe..65240507 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -1,20 +1,19 @@ from datadog_lambda.cold_start import is_cold_start, wrap_find_spec +import os -if is_cold_start(): - import os +if ( + is_cold_start() + and os.environ.get("DD_TRACE_ENABLED", "true").lower() == "true" + and os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true" +): + from sys import version_info, meta_path - if ( - os.environ.get("DD_TRACE_ENABLED", "true").lower() == "true" - and os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true" - ): - from sys import version_info, meta_path - - if version_info >= (3, 7): # current implementation only support version > 3.7 - for importer in meta_path: - try: - importer.find_spec = wrap_find_spec(importer.find_spec) - except: - pass + if version_info >= (3, 7): # current implementation only support version > 3.7 + for importer in meta_path: + try: + importer.find_spec = wrap_find_spec(importer.find_spec) + except: + pass # The minor version corresponds to the Lambda layer version. # E.g.,, version 0.5.0 gets packaged into layer version 5. From 52de38c355e9ae41e56cf8e228c66bc3eb7adb38 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Sun, 12 Feb 2023 14:59:50 -0500 Subject: [PATCH 06/13] add test_cold_start --- datadog_lambda/__init__.py | 18 +--- datadog_lambda/cold_start.py | 69 ++++++++----- datadog_lambda/wrapper.py | 2 - tests/test_cold_start.py | 195 +++++++++++++++++++++++++++++++++++ 4 files changed, 241 insertions(+), 43 deletions(-) create mode 100644 tests/test_cold_start.py diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 65240507..4772561d 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -1,19 +1,6 @@ -from datadog_lambda.cold_start import is_cold_start, wrap_find_spec -import os - -if ( - is_cold_start() - and os.environ.get("DD_TRACE_ENABLED", "true").lower() == "true" - and os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true" -): - from sys import version_info, meta_path +from datadog_lambda.cold_start import initialize_cold_start_tracing - if version_info >= (3, 7): # current implementation only support version > 3.7 - for importer in meta_path: - try: - importer.find_spec = wrap_find_spec(importer.find_spec) - except: - pass +initialize_cold_start_tracing() # The minor version corresponds to the Lambda layer version. # E.g.,, version 0.5.0 gets packaged into layer version 5. @@ -24,6 +11,7 @@ __version__ = importlib_metadata.version(__name__) +import os import logging logger = logging.getLogger(__name__) diff --git a/datadog_lambda/cold_start.py b/datadog_lambda/cold_start.py index 1c4aad08..ff3df874 100644 --- a/datadog_lambda/cold_start.py +++ b/datadog_lambda/cold_start.py @@ -1,12 +1,9 @@ import time import os -from importlib.abc import Loader from typing import List _cold_start = True _lambda_container_initialized = False -root_nodes = [] -import_stack = [] def set_cold_start(): @@ -39,14 +36,29 @@ def __init__(self, module_name, full_file_path, start_time_ns, end_time_ns=None) self.children = [] +root_nodes: List[ImportNode] = [] +import_stack: List[ImportNode] = [] +already_wrapped_loaders = set() +already_wrapped_importers = set() + + +def reset_node_stacks(): + global root_nodes + root_nodes = [] + global import_stack + import_stack = [] + + def push_node(module_name, file_path): node = ImportNode(module_name, file_path, time.time_ns()) + global import_stack if import_stack: import_stack[-1].children.append(node) import_stack.append(node) def pop_node(module_name): + global import_stack if not import_stack: return node = import_stack.pop() @@ -55,14 +67,15 @@ def pop_node(module_name): end_time_ns = time.time_ns() node.end_time_ns = end_time_ns if not import_stack: # import_stack empty, a root node has been found + global root_nodes root_nodes.append(node) def wrap_exec_module(original_exec_module): def wrapped_method(module): should_pop = False - spec = module.__spec__ try: + spec = module.__spec__ push_node(spec.name, spec.origin) should_pop = True except: @@ -76,39 +89,43 @@ def wrapped_method(module): return wrapped_method -def wrap_load_module(original_load_module): - def wrapped_method(fullname): - should_pop = False - try: - push_node(fullname, fullname) - should_pop = True - except: - pass - try: - return original_load_module(fullname) - finally: - if should_pop: - pop_node(fullname) - - return wrapped_method - - def wrap_find_spec(original_find_spec): def wrapped_find_spec(*args, **kwargs): spec = original_find_spec(*args, **kwargs) if spec is None: return None loader = getattr(spec, "loader", None) - if loader is not None: - if hasattr(loader, "exec_module") and hasattr(loader, "create_module"): - loader.exec_module = wrap_exec_module(loader.exec_module) - if hasattr(loader, "load_module"): # legacy support - loader.load_module = wrap_load_module(loader.load_module) + if loader is not None and loader not in already_wrapped_loaders: + if hasattr(loader, "exec_module"): + try: + loader.exec_module = wrap_exec_module(loader.exec_module) + already_wrapped_loaders.add(loader) + except: + pass return spec return wrapped_find_spec +def initialize_cold_start_tracing(): + if ( + is_cold_start() + and os.environ.get("DD_TRACE_ENABLED", "true").lower() == "true" + and os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true" + ): + from sys import version_info, meta_path + + if version_info >= (3, 7): # current implementation only support version > 3.7 + for importer in meta_path: + if importer in already_wrapped_importers: + continue + try: + importer.find_spec = wrap_find_spec(importer.find_spec) + already_wrapped_importers.add(importer) + except: + pass + + class ColdStartTracer(object): def __init__( self, diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 15e7a4c0..c363739b 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -109,7 +109,6 @@ def __new__(cls, func): def __init__(self, func): """Executes when the wrapped function gets wrapped""" try: - # patch_import() self.func = func self.flush_to_log = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" self.logs_injection = ( @@ -325,7 +324,6 @@ def _after(self, event, context): event.get("requestContext", {}).get("requestId") ) logger.debug("datadog_lambda_wrapper _after() done") - except Exception: traceback.print_exc() diff --git a/tests/test_cold_start.py b/tests/test_cold_start.py new file mode 100644 index 00000000..9d357fc3 --- /dev/null +++ b/tests/test_cold_start.py @@ -0,0 +1,195 @@ +import unittest +import datadog_lambda.cold_start as cold_start +from sys import modules, meta_path +import os +from unittest.mock import MagicMock + + +class TestColdStartTracingSetup(unittest.TestCase): + def import_a_module_and_check(self): + cold_start.initialize_cold_start_tracing() + cold_start.reset_node_stacks() + for module_name in ["ast", "dis", "inspect"]: + if module_name in modules: + del modules[module_name] + import inspect # import some package + + self.assertTrue(inspect.ismodule(inspect)) + self.assertEqual(len(cold_start.root_nodes), 1) + self.assertEqual(cold_start.root_nodes[0].module_name, "inspect") + self.assertEqual(len(cold_start.root_nodes[0].children), 2) + + def test_initialize_cold_start_tracing(self): + cold_start.initialize_cold_start_tracing() # testing double wrapping + self.import_a_module_and_check() + + def test_bad_importer_find_spec_attribute_error(self): + mock_importer = object() # AttributeError when accessing find_spec + meta_path.append(mock_importer) + cold_start.initialize_cold_start_tracing() # safe to call + meta_path.pop() + + def test_not_wrapping_case(self): + os.environ["DD_COLD_START_TRACING"] = "false" + mock_importer = object() + meta_path.append(mock_importer) + cold_start.initialize_cold_start_tracing() + self.assertFalse(mock_importer in cold_start.already_wrapped_importers) + meta_path.pop() + os.environ["DD_COLD_START_TRACING"] = "true" + + def test_exec_module_failure_case(self): + mock_importer = MagicMock() + mock_module_spec = MagicMock() + mock_module_spec.name = "test_name" + mock_loader = MagicMock() + + def bad_exec_module(*args, **kwargs): + raise Exception("Module failed to load") + + mock_loader.exec_module = bad_exec_module + mock_module_spec.loader = mock_loader + + def find_spec(*args, **kwargs): + return mock_module_spec + + mock_importer.find_spec = find_spec + meta_path.insert(0, mock_importer) + cold_start.initialize_cold_start_tracing() + cold_start.reset_node_stacks() + try: + import dummy_module + except Exception as e: + self.assertEqual(str(e), "Module failed to load") + self.assertEqual(len(cold_start.root_nodes), 1) + self.assertEqual(cold_start.root_nodes[0].module_name, mock_module_spec.name) + meta_path.pop(0) + + +class TestColdStartTracer(unittest.TestCase): + def setUp(self) -> None: + mock_tracer = MagicMock() + self.output_spans = [] + self.shared_mock_span = MagicMock() + self.shared_mock_span.current_spans = [] + self.finish_call_count = 0 + + def _finish(finish_time_s): + module_name = self.shared_mock_span.current_spans.pop() + self.output_spans.append(module_name) + self.finish_call_count += 1 + + self.shared_mock_span.finish = _finish + + def _trace(*args, **kwargs): + module_name = kwargs["resource"] + self.shared_mock_span.current_spans.append(module_name) + return self.shared_mock_span + + mock_tracer.trace = _trace + self.mock_activate = MagicMock() + mock_tracer.context_provider.activate = self.mock_activate + self.mock_trace_ctx = MagicMock() + self.first_node_start_time_ns = 1676217209680116000 + self.cold_start_tracer = cold_start.ColdStartTracer( + mock_tracer, + "unittest_cold_start", + self.first_node_start_time_ns + 2e9, + self.mock_trace_ctx, + 3, + ["ignored_module_a", "ignored_module_b"], + ) + self.test_time_unit = (self.cold_start_tracer.min_duration_ms + 1) * 1e6 + + def test_trace_empty_root_nodes(self): + self.cold_start_tracer.trace([]) + self.assertEqual(len(self.output_spans), 0) + + def test_trace_one_root_node_no_children(self): + node_0 = cold_start.ImportNode("node_0", None, self.first_node_start_time_ns) + node_0.end_time_ns = self.first_node_start_time_ns + 4e6 + self.cold_start_tracer.trace([node_0]) + self.mock_activate.assert_called_once_with(self.mock_trace_ctx) + self.assertEqual(self.output_spans, ["node_0", "unittest_cold_start"]) + + def test_trace_one_root_node_with_children(self): + node_0 = cold_start.ImportNode("node_0", None, self.first_node_start_time_ns) + node_0.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 2 + node_1 = cold_start.ImportNode("node_1", None, self.first_node_start_time_ns) + node_1.end_time_ns = self.first_node_start_time_ns + self.test_time_unit + node_2 = cold_start.ImportNode( + "node_2", None, self.first_node_start_time_ns + self.test_time_unit + ) + node_2.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 2 + node_3 = cold_start.ImportNode("node_3", None, self.first_node_start_time_ns) + node_3.end_time_ns = self.first_node_start_time_ns + self.test_time_unit + nodes = [node_0] + node_0.children = [node_1, node_2] + node_1.children = [node_3] + self.cold_start_tracer.trace(nodes) + self.mock_activate.assert_called_with(self.mock_trace_ctx) + self.assertEqual(self.finish_call_count, 5) + self.assertEqual(self.mock_activate.call_count, 2) + self.assertEqual( + self.output_spans, + ["node_3", "node_1", "node_2", "node_0", "unittest_cold_start"], + ) + + def test_trace_multiple_root_nodes(self): + node_0 = cold_start.ImportNode("node_0", None, self.first_node_start_time_ns) + node_0.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 2 + node_1 = cold_start.ImportNode( + "node_1", None, self.first_node_start_time_ns + self.test_time_unit * 2 + ) + node_1.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 3 + node_2 = cold_start.ImportNode("node_2", None, self.first_node_start_time_ns) + node_2.end_time_ns = self.first_node_start_time_ns + self.test_time_unit + node_3 = cold_start.ImportNode( + "node_3", None, self.first_node_start_time_ns + self.test_time_unit + ) + node_3.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 2 + node_4 = cold_start.ImportNode( + "node_4", None, self.first_node_start_time_ns + self.test_time_unit * 2 + ) + node_4.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 3 + nodes = [node_0, node_1] + node_0.children = [node_2, node_3] + node_1.children = [node_4] + self.cold_start_tracer.trace(nodes) + self.mock_activate.assert_called_with(self.mock_trace_ctx) + self.assertEqual(self.finish_call_count, 6) + self.assertEqual(self.mock_activate.call_count, 3) + self.assertEqual( + self.output_spans, + ["node_4", "node_1", "node_2", "node_3", "node_0", "unittest_cold_start"], + ) + + def test_trace_min_duration(self): + node_0 = cold_start.ImportNode("node_0", None, self.first_node_start_time_ns) + node_0.end_time_ns = ( + self.first_node_start_time_ns + + self.cold_start_tracer.min_duration_ms * 1e6 + - 1e5 + ) + self.cold_start_tracer.trace([node_0]) + self.mock_activate.assert_called_once_with(self.mock_trace_ctx) + self.assertEqual(self.output_spans, ["unittest_cold_start"]) + + def test_trace_ignore_libs(self): + node_0 = cold_start.ImportNode("node_0", None, self.first_node_start_time_ns) + node_0.end_time_ns = self.first_node_start_time_ns + self.test_time_unit + node_1 = cold_start.ImportNode( + "ignored_module_a", + None, + self.first_node_start_time_ns + self.test_time_unit, + ) + node_1.end_time_ns = self.first_node_start_time_ns + self.test_time_unit * 2 + node_2 = cold_start.ImportNode( + "ignored_module_b", None, self.first_node_start_time_ns + ) + node_2.end_time_ns = self.first_node_start_time_ns + self.test_time_unit + nodes = [node_0, node_1] + node_0.children = [node_2] + self.cold_start_tracer.trace(nodes) + self.mock_activate.assert_called_once_with(self.mock_trace_ctx) + self.assertEqual(self.output_spans, ["node_0", "unittest_cold_start"]) From 56bf8cbc00f4c267a1638bc45abc7a376e079c4a Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Sun, 12 Feb 2023 15:24:39 -0500 Subject: [PATCH 07/13] fix lint and integration tests --- datadog_lambda/__init__.py | 5 ++--- datadog_lambda/cold_start.py | 6 +++--- datadog_lambda/wrapper.py | 4 ++-- tests/integration/serverless.yml | 1 + 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 4772561d..20b42443 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -1,3 +1,5 @@ +import os +import logging from datadog_lambda.cold_start import initialize_cold_start_tracing initialize_cold_start_tracing() @@ -11,8 +13,5 @@ __version__ = importlib_metadata.version(__name__) -import os -import logging - logger = logging.getLogger(__name__) logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) diff --git a/datadog_lambda/cold_start.py b/datadog_lambda/cold_start.py index ff3df874..5ab9aeb2 100644 --- a/datadog_lambda/cold_start.py +++ b/datadog_lambda/cold_start.py @@ -78,7 +78,7 @@ def wrapped_method(module): spec = module.__spec__ push_node(spec.name, spec.origin) should_pop = True - except: + except Exception: pass try: return original_exec_module(module) @@ -100,7 +100,7 @@ def wrapped_find_spec(*args, **kwargs): try: loader.exec_module = wrap_exec_module(loader.exec_module) already_wrapped_loaders.add(loader) - except: + except Exception: pass return spec @@ -122,7 +122,7 @@ def initialize_cold_start_tracing(): try: importer.find_spec = wrap_find_spec(importer.find_spec) already_wrapped_importers.add(importer) - except: + except Exception: pass diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index c363739b..85183d3a 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -140,7 +140,7 @@ def __init__(self, func): self.min_cold_start_trace_duration = int( os.environ["DD_MIN_COLD_START_DURATION"] ) - except: + except Exception: logger.debug("Malformatted env DD_MIN_COLD_START_DURATION") self.cold_start_trace_skip_lib = [] if "DD_COLD_START_TRACE_SKIP_LIB" in os.environ: @@ -148,7 +148,7 @@ def __init__(self, func): self.cold_start_trace_skip_lib = os.environ[ "DD_COLD_START_TRACE_SKIP_LIB" ].split(",") - except: + except Exception: logger.debug("Malformatted for env DD_COLD_START_TRACE_SKIP_LIB") self.response = None if profiling_env_var: diff --git a/tests/integration/serverless.yml b/tests/integration/serverless.yml index 9bb8a79b..27112f54 100644 --- a/tests/integration/serverless.yml +++ b/tests/integration/serverless.yml @@ -11,6 +11,7 @@ provider: DD_TRACE_ENABLED: true DD_API_KEY: ${env:DD_API_KEY} DD_TRACE_MANAGED_SERVICES: true + DD_COLD_START_TRACING: false timeout: 15 deploymentBucket: name: integration-tests-serververless-deployment-bucket From 30f746bcad9706ca83e15bd02c2590b70eca08bd Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Sun, 12 Feb 2023 15:49:56 -0500 Subject: [PATCH 08/13] make sure loader is Hashable in order to use in a set --- datadog_lambda/cold_start.py | 12 ++++++------ tests/test_cold_start.py | 13 +++++++++++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/datadog_lambda/cold_start.py b/datadog_lambda/cold_start.py index 5ab9aeb2..31d9f860 100644 --- a/datadog_lambda/cold_start.py +++ b/datadog_lambda/cold_start.py @@ -1,6 +1,6 @@ import time import os -from typing import List +from typing import List, Hashable _cold_start = True _lambda_container_initialized = False @@ -39,7 +39,6 @@ def __init__(self, module_name, full_file_path, start_time_ns, end_time_ns=None) root_nodes: List[ImportNode] = [] import_stack: List[ImportNode] = [] already_wrapped_loaders = set() -already_wrapped_importers = set() def reset_node_stacks(): @@ -95,7 +94,11 @@ def wrapped_find_spec(*args, **kwargs): if spec is None: return None loader = getattr(spec, "loader", None) - if loader is not None and loader not in already_wrapped_loaders: + if ( + loader is not None + and isinstance(loader, Hashable) + and loader not in already_wrapped_loaders + ): if hasattr(loader, "exec_module"): try: loader.exec_module = wrap_exec_module(loader.exec_module) @@ -117,11 +120,8 @@ def initialize_cold_start_tracing(): if version_info >= (3, 7): # current implementation only support version > 3.7 for importer in meta_path: - if importer in already_wrapped_importers: - continue try: importer.find_spec = wrap_find_spec(importer.find_spec) - already_wrapped_importers.add(importer) except Exception: pass diff --git a/tests/test_cold_start.py b/tests/test_cold_start.py index 9d357fc3..3ee66378 100644 --- a/tests/test_cold_start.py +++ b/tests/test_cold_start.py @@ -31,10 +31,19 @@ def test_bad_importer_find_spec_attribute_error(self): def test_not_wrapping_case(self): os.environ["DD_COLD_START_TRACING"] = "false" - mock_importer = object() + mock_importer = MagicMock() + mock_module_spec = MagicMock() + mock_module_spec.name = "test_name" + mock_loader = object() + mock_module_spec.loader = mock_loader + + def find_spec(*args, **kwargs): + return mock_module_spec + + mock_importer.find_spec = find_spec meta_path.append(mock_importer) cold_start.initialize_cold_start_tracing() - self.assertFalse(mock_importer in cold_start.already_wrapped_importers) + self.assertFalse(mock_loader in cold_start.already_wrapped_loaders) meta_path.pop() os.environ["DD_COLD_START_TRACING"] = "true" From fa86a0cbc17bb2ef6f8c2366cb81c9efb987aa69 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Sun, 12 Feb 2023 15:56:37 -0500 Subject: [PATCH 09/13] update test statements --- tests/test_cold_start.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_cold_start.py b/tests/test_cold_start.py index 3ee66378..31504f84 100644 --- a/tests/test_cold_start.py +++ b/tests/test_cold_start.py @@ -6,7 +6,9 @@ class TestColdStartTracingSetup(unittest.TestCase): - def import_a_module_and_check(self): + + def test_initialize_cold_start_tracing(self): + cold_start.initialize_cold_start_tracing() # testing double wrapping cold_start.initialize_cold_start_tracing() cold_start.reset_node_stacks() for module_name in ["ast", "dis", "inspect"]: @@ -17,11 +19,6 @@ def import_a_module_and_check(self): self.assertTrue(inspect.ismodule(inspect)) self.assertEqual(len(cold_start.root_nodes), 1) self.assertEqual(cold_start.root_nodes[0].module_name, "inspect") - self.assertEqual(len(cold_start.root_nodes[0].children), 2) - - def test_initialize_cold_start_tracing(self): - cold_start.initialize_cold_start_tracing() # testing double wrapping - self.import_a_module_and_check() def test_bad_importer_find_spec_attribute_error(self): mock_importer = object() # AttributeError when accessing find_spec From 36c8ef87e1c5d5afc5d5e0e27c9101bce0ab8cee Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Sun, 12 Feb 2023 15:57:21 -0500 Subject: [PATCH 10/13] format --- tests/test_cold_start.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_cold_start.py b/tests/test_cold_start.py index 31504f84..22e7dc9c 100644 --- a/tests/test_cold_start.py +++ b/tests/test_cold_start.py @@ -6,7 +6,6 @@ class TestColdStartTracingSetup(unittest.TestCase): - def test_initialize_cold_start_tracing(self): cold_start.initialize_cold_start_tracing() # testing double wrapping cold_start.initialize_cold_start_tracing() From c9f2b3aedd80c4e65792f01d65d0b20eec31621c Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 13 Feb 2023 14:36:58 -0500 Subject: [PATCH 11/13] add logging, add default skip libs, change the following-span order --- datadog_lambda/cold_start.py | 6 ++++-- datadog_lambda/wrapper.py | 9 +++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/datadog_lambda/cold_start.py b/datadog_lambda/cold_start.py index 31d9f860..4d6e3e56 100644 --- a/datadog_lambda/cold_start.py +++ b/datadog_lambda/cold_start.py @@ -1,6 +1,8 @@ import time import os from typing import List, Hashable +import logging +logger = logging.getLogger(__name__) _cold_start = True _lambda_container_initialized = False @@ -103,8 +105,8 @@ def wrapped_find_spec(*args, **kwargs): try: loader.exec_module = wrap_exec_module(loader.exec_module) already_wrapped_loaders.add(loader) - except Exception: - pass + except Exception as e: + logger.debug("Failed to wrap the loader. %s", e) return spec return wrapped_find_spec diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 85183d3a..576a1040 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -145,9 +145,10 @@ def __init__(self, func): self.cold_start_trace_skip_lib = [] if "DD_COLD_START_TRACE_SKIP_LIB" in os.environ: try: - self.cold_start_trace_skip_lib = os.environ[ - "DD_COLD_START_TRACE_SKIP_LIB" - ].split(",") + self.cold_start_trace_skip_lib = os.environ.get( + "DD_COLD_START_TRACE_SKIP_LIB", + "datadog_lambda.extension,datadog_lambda.metric,datadog_lambda.patch" + ).split(",") except Exception: logger.debug("Malformatted for env DD_COLD_START_TRACE_SKIP_LIB") self.response = None @@ -302,7 +303,7 @@ def _after(self, event, context): if should_trace_cold_start: try: - following_span = self.span or self.inferred_span + following_span = self.inferred_span or self.span ColdStartTracer( tracer, self.function_name, From bae46262ac402a1005a752fdc5cc96d96a752d0d Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 13 Feb 2023 14:39:15 -0500 Subject: [PATCH 12/13] format --- datadog_lambda/cold_start.py | 1 + datadog_lambda/wrapper.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/cold_start.py b/datadog_lambda/cold_start.py index 4d6e3e56..fdb43b81 100644 --- a/datadog_lambda/cold_start.py +++ b/datadog_lambda/cold_start.py @@ -2,6 +2,7 @@ import os from typing import List, Hashable import logging + logger = logging.getLogger(__name__) _cold_start = True diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 576a1040..34d1c272 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -147,7 +147,7 @@ def __init__(self, func): try: self.cold_start_trace_skip_lib = os.environ.get( "DD_COLD_START_TRACE_SKIP_LIB", - "datadog_lambda.extension,datadog_lambda.metric,datadog_lambda.patch" + "datadog_lambda.extension,datadog_lambda.metric,datadog_lambda.patch", ).split(",") except Exception: logger.debug("Malformatted for env DD_COLD_START_TRACE_SKIP_LIB") From fe06e818c8ec89e0c11a431875abe7aa256d1baf Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 13 Feb 2023 15:26:50 -0500 Subject: [PATCH 13/13] update cold_start_trace_skip_lib --- datadog_lambda/wrapper.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 34d1c272..fb849cec 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -142,13 +142,15 @@ def __init__(self, func): ) except Exception: logger.debug("Malformatted env DD_MIN_COLD_START_DURATION") - self.cold_start_trace_skip_lib = [] + self.cold_start_trace_skip_lib = [ + "ddtrace.internal.compat", + "ddtrace.filters", + ] if "DD_COLD_START_TRACE_SKIP_LIB" in os.environ: try: - self.cold_start_trace_skip_lib = os.environ.get( - "DD_COLD_START_TRACE_SKIP_LIB", - "datadog_lambda.extension,datadog_lambda.metric,datadog_lambda.patch", - ).split(",") + self.cold_start_trace_skip_lib = os.environ[ + "DD_COLD_START_TRACE_SKIP_LIB" + ].split(",") except Exception: logger.debug("Malformatted for env DD_COLD_START_TRACE_SKIP_LIB") self.response = None @@ -303,7 +305,7 @@ def _after(self, event, context): if should_trace_cold_start: try: - following_span = self.inferred_span or self.span + following_span = self.span or self.inferred_span ColdStartTracer( tracer, self.function_name,