Skip to content

Commit 63f7082

Browse files
authored
feat: EventBridge to SQS support (#358)
* add eventbridge to sqs inferred span support also allowed to properly extract trace context * add unit tests * lint and fix event * lint * fix unpacking * lint * update docs
1 parent faf5b6b commit 63f7082

File tree

3 files changed

+134
-8
lines changed

3 files changed

+134
-8
lines changed

datadog_lambda/tracing.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,29 @@ def create_sns_event(message):
229229

230230
def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
231231
"""
232-
Extract Datadog trace context from the first SQS message attributes.
232+
Extract Datadog trace context from an SQS event.
233+
234+
The extraction chain goes as follows:
235+
EB => SQS (First records body contains EB context), or
236+
SNS => SQS (First records body contains SNS context), or
237+
SQS or SNS (`messageAttributes` for SQS context,
238+
`MessageAttributes` for SNS context), else
239+
Lambda Context.
233240
234241
Falls back to lambda context if no trace data is found in the SQS message attributes.
235242
"""
243+
244+
# EventBridge => SQS
245+
try:
246+
(
247+
trace_id,
248+
parent_id,
249+
sampling_priority,
250+
) = _extract_context_from_eventbridge_sqs_event(event)
251+
return trace_id, parent_id, sampling_priority
252+
except Exception:
253+
logger.debug("Failed extracting context as EventBridge to SQS.")
254+
236255
try:
237256
first_record = event.get("Records")[0]
238257

@@ -283,6 +302,30 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
283302
return extract_context_from_lambda_context(lambda_context)
284303

285304

305+
def _extract_context_from_eventbridge_sqs_event(event):
306+
"""
307+
Extracts Datadog trace context from an SQS event triggered by
308+
EventBridge.
309+
310+
This is only possible if first record in `Records` contains a
311+
`body` field which contains the EventBridge `detail` as a JSON string.
312+
"""
313+
try:
314+
first_record = event.get("Records")[0]
315+
if "body" in first_record:
316+
body_str = first_record.get("body", {})
317+
body = json.loads(body_str)
318+
319+
detail = body.get("detail")
320+
dd_context = detail.get("_datadog")
321+
trace_id = dd_context.get(TraceHeader.TRACE_ID)
322+
parent_id = dd_context.get(TraceHeader.PARENT_ID)
323+
sampling_priority = dd_context.get(TraceHeader.SAMPLING_PRIORITY)
324+
return trace_id, parent_id, sampling_priority
325+
except Exception:
326+
raise
327+
328+
286329
def extract_context_from_eventbridge_event(event, lambda_context):
287330
"""
288331
Extract datadog trace context from an EventBridge message's Details.
@@ -995,21 +1038,33 @@ def create_inferred_span_from_sqs_event(event, context):
9951038
}
9961039
start_time = int(request_time_epoch) / 1000
9971040

998-
# logic to deal with SNS => SQS event
999-
sns_span = None
1041+
upstream_span = None
10001042
if "body" in event_record:
10011043
body_str = event_record.get("body", {})
10021044
try:
10031045
body = json.loads(body_str)
1046+
1047+
# logic to deal with SNS => SQS event
10041048
if body.get("Type", "") == "Notification" and "TopicArn" in body:
10051049
logger.debug("Found SNS message inside SQS event")
1006-
sns_span = create_inferred_span_from_sns_event(
1050+
upstream_span = create_inferred_span_from_sns_event(
10071051
create_sns_event(body), context
10081052
)
1009-
sns_span.finish(finish_time=start_time)
1053+
upstream_span.finish(finish_time=start_time)
1054+
1055+
# EventBridge => SQS
1056+
elif body.get("detail"):
1057+
detail = body.get("detail")
1058+
if detail.get("_datadog"):
1059+
logger.debug("Found an EventBridge message inside SQS event")
1060+
upstream_span = create_inferred_span_from_eventbridge_event(
1061+
body, context
1062+
)
1063+
upstream_span.finish(finish_time=start_time)
1064+
10101065
except Exception as e:
10111066
logger.debug(
1012-
"Unable to create SNS span from SQS message, with error %s" % e
1067+
"Unable to create upstream span from SQS message, with error %s" % e
10131068
)
10141069
pass
10151070

@@ -1021,8 +1076,8 @@ def create_inferred_span_from_sqs_event(event, context):
10211076
if span:
10221077
span.set_tags(tags)
10231078
span.start = start_time
1024-
if sns_span:
1025-
span.parent_id = sns_span.span_id
1079+
if upstream_span:
1080+
span.parent_id = upstream_span.span_id
10261081

10271082
return span
10281083

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "e995e54f-1724-41fa-82c0-8b81821f854e",
5+
"receiptHandle": "AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=",
6+
"body": "{\"version\":\"0\",\"id\":\"af718b2a-b987-e8c0-7a2b-a188fad2661a\",\"detail-type\":\"my.Detail\",\"source\":\"my.Source\",\"account\":\"425362996713\",\"time\":\"2023-08-03T22:49:03Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"text\":\"Hello, world!\",\"_datadog\":{\"x-datadog-trace-id\":\"7379586022458917877\",\"x-datadog-parent-id\":\"2644033662113726488\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-tags\":\"_dd.p.dm=-0\",\"traceparent\":\"00-000000000000000066698e63821a03f5-24b17e9b6476c018-01\",\"tracestate\":\"dd=t.dm:-0;s:1\"}}}",
7+
"attributes": {
8+
"ApproximateReceiveCount": "1",
9+
"AWSTraceHeader": "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0",
10+
"SentTimestamp": "1691102943638",
11+
"SenderId": "AIDAJXNJGGKNS7OSV23OI",
12+
"ApproximateFirstReceiveTimestamp": "1691102943647"
13+
},
14+
"messageAttributes": {},
15+
"md5OfBody": "93d9f0cd8886d1e000a1a0b7007bffc4",
16+
"eventSource": "aws:sqs",
17+
"eventSourceARN": "arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue",
18+
"awsRegion": "us-east-1"
19+
}
20+
]
21+
}

tests/test_tracing.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,6 +1785,44 @@ def test_create_inferred_span_from_eventbridge_event(self):
17851785
self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self")
17861786
self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async")
17871787

1788+
def test_create_inferred_span_from_eventbridge_sqs_event(self):
1789+
event_sample_name = "eventbridge-sqs"
1790+
test_file = event_samples + event_sample_name + ".json"
1791+
with open(test_file, "r") as event:
1792+
event = json.load(event)
1793+
ctx = get_mock_context()
1794+
ctx.aws_request_id = "123"
1795+
span = create_inferred_span(event, ctx)
1796+
self.assertEqual(span.get_tag("operation_name"), "aws.sqs")
1797+
self.assertEqual(
1798+
span.service,
1799+
"sqs",
1800+
)
1801+
self.assertEqual(
1802+
span.get_tag("http.url"),
1803+
None,
1804+
)
1805+
self.assertEqual(span.get_tag("endpoint"), None)
1806+
self.assertEqual(span.get_tag("http.method"), None)
1807+
self.assertEqual(
1808+
span.get_tag("resource_names"),
1809+
"eventbridge-sqs-queue",
1810+
)
1811+
self.assertEqual(span.get_tag("request_id"), None)
1812+
self.assertEqual(span.get_tag("queuename"), "eventbridge-sqs-queue")
1813+
self.assertEqual(
1814+
span.get_tag("event_source_arn"),
1815+
"arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue",
1816+
)
1817+
self.assertEqual(
1818+
span.get_tag("sender_id"),
1819+
"AIDAJXNJGGKNS7OSV23OI",
1820+
)
1821+
self.assertEqual(span.start, 1691102943.638)
1822+
self.assertEqual(span.span_type, "web")
1823+
self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self")
1824+
self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async")
1825+
17881826
def test_extract_context_from_eventbridge_event(self):
17891827
event_sample_source = "eventbridge-custom"
17901828
test_file = event_samples + event_sample_source + ".json"
@@ -1806,6 +1844,18 @@ def test_extract_dd_trace_context_for_eventbridge(self):
18061844
self.assertEqual(context["trace-id"], "12345")
18071845
self.assertEqual(context["parent-id"], "67890")
18081846

1847+
def test_extract_context_from_eventbridge_sqs_event(self):
1848+
event_sample_source = "eventbridge-sqs"
1849+
test_file = event_samples + event_sample_source + ".json"
1850+
with open(test_file, "r") as event:
1851+
event = json.load(event)
1852+
1853+
ctx = get_mock_context()
1854+
context, source, event_type = extract_dd_trace_context(event, ctx)
1855+
self.assertEqual(context["trace-id"], "7379586022458917877")
1856+
self.assertEqual(context["parent-id"], "2644033662113726488")
1857+
self.assertEqual(context["sampling-priority"], "1")
1858+
18091859
def test_extract_context_from_sqs_event_with_string_msg_attr(self):
18101860
event_sample_source = "sqs-string-msg-attribute"
18111861
test_file = event_samples + event_sample_source + ".json"

0 commit comments

Comments
 (0)