diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 198332a3..92c3e01c 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -229,10 +229,29 @@ def create_sns_event(message): def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): """ - Extract Datadog trace context from the first SQS message attributes. + Extract Datadog trace context from an SQS event. + + The extraction chain goes as follows: + EB => SQS (First records body contains EB context), or + SNS => SQS (First records body contains SNS context), or + SQS or SNS (`messageAttributes` for SQS context, + `MessageAttributes` for SNS context), else + Lambda Context. Falls back to lambda context if no trace data is found in the SQS message attributes. """ + + # EventBridge => SQS + try: + ( + trace_id, + parent_id, + sampling_priority, + ) = _extract_context_from_eventbridge_sqs_event(event) + return trace_id, parent_id, sampling_priority + except Exception: + logger.debug("Failed extracting context as EventBridge to SQS.") + try: first_record = event.get("Records")[0] @@ -283,6 +302,30 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): return extract_context_from_lambda_context(lambda_context) +def _extract_context_from_eventbridge_sqs_event(event): + """ + Extracts Datadog trace context from an SQS event triggered by + EventBridge. + + This is only possible if first record in `Records` contains a + `body` field which contains the EventBridge `detail` as a JSON string. + """ + try: + first_record = event.get("Records")[0] + if "body" in first_record: + body_str = first_record.get("body", {}) + body = json.loads(body_str) + + detail = body.get("detail") + dd_context = detail.get("_datadog") + trace_id = dd_context.get(TraceHeader.TRACE_ID) + parent_id = dd_context.get(TraceHeader.PARENT_ID) + sampling_priority = dd_context.get(TraceHeader.SAMPLING_PRIORITY) + return trace_id, parent_id, sampling_priority + except Exception: + raise + + def extract_context_from_eventbridge_event(event, lambda_context): """ Extract datadog trace context from an EventBridge message's Details. @@ -995,21 +1038,33 @@ def create_inferred_span_from_sqs_event(event, context): } start_time = int(request_time_epoch) / 1000 - # logic to deal with SNS => SQS event - sns_span = None + upstream_span = None if "body" in event_record: body_str = event_record.get("body", {}) try: body = json.loads(body_str) + + # logic to deal with SNS => SQS event if body.get("Type", "") == "Notification" and "TopicArn" in body: logger.debug("Found SNS message inside SQS event") - sns_span = create_inferred_span_from_sns_event( + upstream_span = create_inferred_span_from_sns_event( create_sns_event(body), context ) - sns_span.finish(finish_time=start_time) + upstream_span.finish(finish_time=start_time) + + # EventBridge => SQS + elif body.get("detail"): + detail = body.get("detail") + if detail.get("_datadog"): + logger.debug("Found an EventBridge message inside SQS event") + upstream_span = create_inferred_span_from_eventbridge_event( + body, context + ) + upstream_span.finish(finish_time=start_time) + except Exception as e: logger.debug( - "Unable to create SNS span from SQS message, with error %s" % e + "Unable to create upstream span from SQS message, with error %s" % e ) pass @@ -1021,8 +1076,8 @@ def create_inferred_span_from_sqs_event(event, context): if span: span.set_tags(tags) span.start = start_time - if sns_span: - span.parent_id = sns_span.span_id + if upstream_span: + span.parent_id = upstream_span.span_id return span diff --git a/tests/event_samples/eventbridge-sqs.json b/tests/event_samples/eventbridge-sqs.json new file mode 100644 index 00000000..606abc09 --- /dev/null +++ b/tests/event_samples/eventbridge-sqs.json @@ -0,0 +1,21 @@ +{ + "Records": [ + { + "messageId": "e995e54f-1724-41fa-82c0-8b81821f854e", + "receiptHandle": "AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=", + "body": "{\"version\":\"0\",\"id\":\"af718b2a-b987-e8c0-7a2b-a188fad2661a\",\"detail-type\":\"my.Detail\",\"source\":\"my.Source\",\"account\":\"425362996713\",\"time\":\"2023-08-03T22:49:03Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"text\":\"Hello, world!\",\"_datadog\":{\"x-datadog-trace-id\":\"7379586022458917877\",\"x-datadog-parent-id\":\"2644033662113726488\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-tags\":\"_dd.p.dm=-0\",\"traceparent\":\"00-000000000000000066698e63821a03f5-24b17e9b6476c018-01\",\"tracestate\":\"dd=t.dm:-0;s:1\"}}}", + "attributes": { + "ApproximateReceiveCount": "1", + "AWSTraceHeader": "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0", + "SentTimestamp": "1691102943638", + "SenderId": "AIDAJXNJGGKNS7OSV23OI", + "ApproximateFirstReceiveTimestamp": "1691102943647" + }, + "messageAttributes": {}, + "md5OfBody": "93d9f0cd8886d1e000a1a0b7007bffc4", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue", + "awsRegion": "us-east-1" + } + ] +} diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 978c0f1f..24e6dcdd 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1785,6 +1785,44 @@ def test_create_inferred_span_from_eventbridge_event(self): self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self") self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async") + def test_create_inferred_span_from_eventbridge_sqs_event(self): + event_sample_name = "eventbridge-sqs" + test_file = event_samples + event_sample_name + ".json" + with open(test_file, "r") as event: + event = json.load(event) + ctx = get_mock_context() + ctx.aws_request_id = "123" + span = create_inferred_span(event, ctx) + self.assertEqual(span.get_tag("operation_name"), "aws.sqs") + self.assertEqual( + span.service, + "sqs", + ) + self.assertEqual( + span.get_tag("http.url"), + None, + ) + self.assertEqual(span.get_tag("endpoint"), None) + self.assertEqual(span.get_tag("http.method"), None) + self.assertEqual( + span.get_tag("resource_names"), + "eventbridge-sqs-queue", + ) + self.assertEqual(span.get_tag("request_id"), None) + self.assertEqual(span.get_tag("queuename"), "eventbridge-sqs-queue") + self.assertEqual( + span.get_tag("event_source_arn"), + "arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue", + ) + self.assertEqual( + span.get_tag("sender_id"), + "AIDAJXNJGGKNS7OSV23OI", + ) + self.assertEqual(span.start, 1691102943.638) + self.assertEqual(span.span_type, "web") + self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self") + self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async") + def test_extract_context_from_eventbridge_event(self): event_sample_source = "eventbridge-custom" test_file = event_samples + event_sample_source + ".json" @@ -1806,6 +1844,18 @@ def test_extract_dd_trace_context_for_eventbridge(self): self.assertEqual(context["trace-id"], "12345") self.assertEqual(context["parent-id"], "67890") + def test_extract_context_from_eventbridge_sqs_event(self): + event_sample_source = "eventbridge-sqs" + test_file = event_samples + event_sample_source + ".json" + with open(test_file, "r") as event: + event = json.load(event) + + ctx = get_mock_context() + context, source, event_type = extract_dd_trace_context(event, ctx) + self.assertEqual(context["trace-id"], "7379586022458917877") + self.assertEqual(context["parent-id"], "2644033662113726488") + self.assertEqual(context["sampling-priority"], "1") + def test_extract_context_from_sqs_event_with_string_msg_attr(self): event_sample_source = "sqs-string-msg-attribute" test_file = event_samples + event_sample_source + ".json"