Skip to content

Explicit trace ID propagation for SFN #526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datadog_lambda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

# Datadog trace sampling priority


# Datadog trace sampling priority
class SamplingPriority(object):
USER_REJECT = -1
AUTO_REJECT = 0
Expand All @@ -18,6 +17,7 @@ class TraceHeader(object):
TRACE_ID = "x-datadog-trace-id"
PARENT_ID = "x-datadog-parent-id"
SAMPLING_PRIORITY = "x-datadog-sampling-priority"
TAGS = "x-datadog-tags"


# X-Ray subsegment to save Datadog trace metadata
Expand Down
75 changes: 71 additions & 4 deletions datadog_lambda/span_pointers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import logging
from typing import List

from ddtrace._trace.utils_botocore.span_pointers import (
from ddtrace._trace.utils_botocore.span_pointers.dynamodb import (
_aws_dynamodb_item_span_pointer_description,
)
from ddtrace._trace.utils_botocore.span_pointers.s3 import (
_aws_s3_object_span_pointer_description,
)
from ddtrace._trace._span_pointer import _SpanPointerDirection
Expand All @@ -21,10 +24,13 @@ def calculate_span_pointers(
if event_source.equals(EventTypes.S3):
return _calculate_s3_span_pointers_for_event(event)

elif event_source.equals(EventTypes.DYNAMODB):
return _calculate_dynamodb_span_pointers_for_event(event)

except Exception as e:
logger.warning(
"failed to calculate span pointers for event: %s",
str(e),
e,
)

return []
Expand Down Expand Up @@ -69,7 +75,7 @@ def _calculate_s3_span_pointers_for_object_created_s3_information(
except KeyError as e:
logger.warning(
"missing s3 information required to make a span pointer: %s",
str(e),
e,
)
return []

Expand All @@ -86,6 +92,67 @@ def _calculate_s3_span_pointers_for_object_created_s3_information(
except Exception as e:
logger.warning(
"failed to generate S3 span pointer: %s",
str(e),
e,
)
return []


def _calculate_dynamodb_span_pointers_for_event(event) -> List[_SpanPointerDescription]:
# Example event:
# https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html

return list(
chain.from_iterable(
_calculate_dynamodb_span_pointers_for_event_record(record)
for record in event.get("Records", [])
)
)


def _calculate_dynamodb_span_pointers_for_event_record(
record,
) -> List[_SpanPointerDescription]:
try:
table_name = _extract_table_name_from_dynamodb_stream_record(record)
primary_key = record["dynamodb"]["Keys"]

except Exception as e:
logger.warning(
"missing DynamoDB information required to make a span pointer: %s",
e,
)
return []

try:
return [
_aws_dynamodb_item_span_pointer_description(
pointer_direction=_SpanPointerDirection.UPSTREAM,
table_name=table_name,
primary_key=primary_key,
)
]

except Exception as e:
logger.warning(
"failed to generate DynamoDB span pointer: %s",
e,
)
return []


def _extract_table_name_from_dynamodb_stream_record(record) -> str:
# Example eventSourceARN:
# arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525
event_source_arn = record["eventSourceARN"]

[_arn, _aws, _dynamodb, _region, _account, dynamodb_info] = event_source_arn.split(
":", maxsplit=5
)
if _arn != "arn" or _aws != "aws" or _dynamodb != "dynamodb":
raise ValueError(f"unexpected eventSourceARN format: {event_source_arn}")

[_table, table_name, _stream, _timestamp] = dynamodb_info.split("/")
if _table != "table" or _stream != "stream":
raise ValueError(f"unexpected eventSourceARN format: {event_source_arn}")

return table_name
40 changes: 29 additions & 11 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TraceContextSource,
XrayDaemon,
Headers,
TraceHeader,
)
from datadog_lambda.xray import (
send_segment,
Expand Down Expand Up @@ -380,10 +381,33 @@ def extract_context_from_step_functions(event, lambda_context):
execution_id = event.get("Execution").get("Id")
state_name = event.get("State").get("Name")
state_entered_time = event.get("State").get("EnteredTime")
# returning 128 bits since 128bit traceId will be break up into
# traditional traceId and _dd.p.tid tag
# https://github.com/DataDog/dd-trace-py/blob/3e34d21cb9b5e1916e549047158cb119317b96ab/ddtrace/propagation/http.py#L232-L240
trace_id = _deterministic_sha256_hash(execution_id, LOWER_64_BITS)
meta = {}

if "_datadog" in event:
trace_header = event.get("_datadog")
if TraceHeader.TRACE_ID in trace_header:
# use the trace ID from the top-most parent when it exists
trace_id = int(trace_header.get(TraceHeader.TRACE_ID))
tags = trace_header.get(TraceHeader.TAGS, "")
for tag in tags.split(","):
tag_key, tag_val = tag.split("=")
meta[tag_key] = tag_val
elif "x-datadog-execution-arn" in trace_header:
root_execution_id = trace_header.get("x-datadog-execution-arn")
trace_id = _deterministic_sha256_hash(root_execution_id, LOWER_64_BITS)
meta["_dd.p.tid"] = hex(
_deterministic_sha256_hash(root_execution_id, HIGHER_64_BITS)
)[2:]
else:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole branch should be the old behavior

# returning 128 bits since 128bit traceId will be break up into
# traditional traceId and _dd.p.tid tag
# https://github.com/DataDog/dd-trace-py/blob/3e34d21cb9b5e1916e549047158cb119317b96ab/ddtrace/propagation/http.py#L232-L240
trace_id = _deterministic_sha256_hash(execution_id, LOWER_64_BITS)
# take the higher 64 bits as _dd.p.tid tag and use hex to encode
# [2:] to remove '0x' in the hex str
meta["_dd.p.tid"] = hex(
_deterministic_sha256_hash(execution_id, HIGHER_64_BITS)
)[2:]

parent_id = _deterministic_sha256_hash(
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
Expand All @@ -394,13 +418,7 @@ def extract_context_from_step_functions(event, lambda_context):
trace_id=trace_id,
span_id=parent_id,
sampling_priority=sampling_priority,
# take the higher 64 bits as _dd.p.tid tag and use hex to encode
# [2:] to remove '0x' in the hex str
meta={
"_dd.p.tid": hex(
_deterministic_sha256_hash(execution_id, HIGHER_64_BITS)
)[2:]
},
meta=meta,
)
except Exception as e:
logger.debug("The Step Functions trace extractor returned with error %s", e)
Expand Down
6 changes: 6 additions & 0 deletions datadog_lambda/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ def extract_http_tags(event):
if headers and headers.get("Referer"):
http_tags["http.referer"] = headers.get("Referer")

# Try to get `routeKey` from API GW v2; otherwise try to get `resource` from API GW v1
route = event.get("routeKey") or event.get("resource")
if route:
# "GET /my/endpoint" = > "/my/endpoint"
http_tags["http.route"] = route.split(" ")[-1]

return http_tags


Expand Down
Loading
Loading