Skip to content

feat: EventBridge to SQS support #358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,29 @@ def create_sns_event(message):

def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
"""
Extract Datadog trace context from the first SQS message attributes.
Extract Datadog trace context from an SQS event.

The extraction chain goes as follows:
EB => SQS (First records body contains EB context), or
SNS => SQS (First records body contains SNS context), or
SQS or SNS (`messageAttributes` for SQS context,
`MessageAttributes` for SNS context), else
Lambda Context.

Falls back to lambda context if no trace data is found in the SQS message attributes.
"""

# EventBridge => SQS
try:
(
trace_id,
parent_id,
sampling_priority,
) = _extract_context_from_eventbridge_sqs_event(event)
return trace_id, parent_id, sampling_priority
except Exception:
logger.debug("Failed extracting context as EventBridge to SQS.")

try:
first_record = event.get("Records")[0]

Expand Down Expand Up @@ -283,6 +302,30 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
return extract_context_from_lambda_context(lambda_context)


def _extract_context_from_eventbridge_sqs_event(event):
"""
Extracts Datadog trace context from an SQS event triggered by
EventBridge.

This is only possible if first record in `Records` contains a
`body` field which contains the EventBridge `detail` as a JSON string.
"""
try:
first_record = event.get("Records")[0]
if "body" in first_record:
body_str = first_record.get("body", {})
body = json.loads(body_str)

detail = body.get("detail")
dd_context = detail.get("_datadog")
trace_id = dd_context.get(TraceHeader.TRACE_ID)
parent_id = dd_context.get(TraceHeader.PARENT_ID)
sampling_priority = dd_context.get(TraceHeader.SAMPLING_PRIORITY)
return trace_id, parent_id, sampling_priority
except Exception:
raise


def extract_context_from_eventbridge_event(event, lambda_context):
"""
Extract datadog trace context from an EventBridge message's Details.
Expand Down Expand Up @@ -995,21 +1038,33 @@ def create_inferred_span_from_sqs_event(event, context):
}
start_time = int(request_time_epoch) / 1000

# logic to deal with SNS => SQS event
sns_span = None
upstream_span = None
if "body" in event_record:
body_str = event_record.get("body", {})
try:
body = json.loads(body_str)

# logic to deal with SNS => SQS event
if body.get("Type", "") == "Notification" and "TopicArn" in body:
logger.debug("Found SNS message inside SQS event")
sns_span = create_inferred_span_from_sns_event(
upstream_span = create_inferred_span_from_sns_event(
create_sns_event(body), context
)
sns_span.finish(finish_time=start_time)
upstream_span.finish(finish_time=start_time)

# EventBridge => SQS
elif body.get("detail"):
detail = body.get("detail")
if detail.get("_datadog"):
logger.debug("Found an EventBridge message inside SQS event")
upstream_span = create_inferred_span_from_eventbridge_event(
body, context
)
upstream_span.finish(finish_time=start_time)

except Exception as e:
logger.debug(
"Unable to create SNS span from SQS message, with error %s" % e
"Unable to create upstream span from SQS message, with error %s" % e
)
pass

Expand All @@ -1021,8 +1076,8 @@ def create_inferred_span_from_sqs_event(event, context):
if span:
span.set_tags(tags)
span.start = start_time
if sns_span:
span.parent_id = sns_span.span_id
if upstream_span:
span.parent_id = upstream_span.span_id

return span

Expand Down
21 changes: 21 additions & 0 deletions tests/event_samples/eventbridge-sqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Records": [
{
"messageId": "e995e54f-1724-41fa-82c0-8b81821f854e",
"receiptHandle": "AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=",
"body": "{\"version\":\"0\",\"id\":\"af718b2a-b987-e8c0-7a2b-a188fad2661a\",\"detail-type\":\"my.Detail\",\"source\":\"my.Source\",\"account\":\"425362996713\",\"time\":\"2023-08-03T22:49:03Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"text\":\"Hello, world!\",\"_datadog\":{\"x-datadog-trace-id\":\"7379586022458917877\",\"x-datadog-parent-id\":\"2644033662113726488\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-tags\":\"_dd.p.dm=-0\",\"traceparent\":\"00-000000000000000066698e63821a03f5-24b17e9b6476c018-01\",\"tracestate\":\"dd=t.dm:-0;s:1\"}}}",
"attributes": {
"ApproximateReceiveCount": "1",
"AWSTraceHeader": "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0",
"SentTimestamp": "1691102943638",
"SenderId": "AIDAJXNJGGKNS7OSV23OI",
"ApproximateFirstReceiveTimestamp": "1691102943647"
},
"messageAttributes": {},
"md5OfBody": "93d9f0cd8886d1e000a1a0b7007bffc4",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue",
"awsRegion": "us-east-1"
}
]
}
50 changes: 50 additions & 0 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,44 @@ def test_create_inferred_span_from_eventbridge_event(self):
self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self")
self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async")

def test_create_inferred_span_from_eventbridge_sqs_event(self):
event_sample_name = "eventbridge-sqs"
test_file = event_samples + event_sample_name + ".json"
with open(test_file, "r") as event:
event = json.load(event)
ctx = get_mock_context()
ctx.aws_request_id = "123"
span = create_inferred_span(event, ctx)
self.assertEqual(span.get_tag("operation_name"), "aws.sqs")
self.assertEqual(
span.service,
"sqs",
)
self.assertEqual(
span.get_tag("http.url"),
None,
)
self.assertEqual(span.get_tag("endpoint"), None)
self.assertEqual(span.get_tag("http.method"), None)
self.assertEqual(
span.get_tag("resource_names"),
"eventbridge-sqs-queue",
)
self.assertEqual(span.get_tag("request_id"), None)
self.assertEqual(span.get_tag("queuename"), "eventbridge-sqs-queue")
self.assertEqual(
span.get_tag("event_source_arn"),
"arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue",
)
self.assertEqual(
span.get_tag("sender_id"),
"AIDAJXNJGGKNS7OSV23OI",
)
self.assertEqual(span.start, 1691102943.638)
self.assertEqual(span.span_type, "web")
self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self")
self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async")

def test_extract_context_from_eventbridge_event(self):
event_sample_source = "eventbridge-custom"
test_file = event_samples + event_sample_source + ".json"
Expand All @@ -1806,6 +1844,18 @@ def test_extract_dd_trace_context_for_eventbridge(self):
self.assertEqual(context["trace-id"], "12345")
self.assertEqual(context["parent-id"], "67890")

def test_extract_context_from_eventbridge_sqs_event(self):
event_sample_source = "eventbridge-sqs"
test_file = event_samples + event_sample_source + ".json"
with open(test_file, "r") as event:
event = json.load(event)

ctx = get_mock_context()
context, source, event_type = extract_dd_trace_context(event, ctx)
self.assertEqual(context["trace-id"], "7379586022458917877")
self.assertEqual(context["parent-id"], "2644033662113726488")
self.assertEqual(context["sampling-priority"], "1")

def test_extract_context_from_sqs_event_with_string_msg_attr(self):
event_sample_source = "sqs-string-msg-attribute"
test_file = event_samples + event_sample_source + ".json"
Expand Down