From 535dd9f039b846c6d085cb0c740cd3099a9a349c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 4 Aug 2023 16:58:38 -0400 Subject: [PATCH 1/7] add eventbridge to sqs inferred span support also allowed to properly extract trace context --- datadog_lambda/tracing.py | 57 ++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 198332a3..f945d8ef 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -233,6 +233,13 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): Falls back to lambda context if no trace data is found in the SQS message attributes. """ + + # EventBrdige => SQS + try: + return _extract_context_from_eventbridge_sqs_event(event) + except Exception: + logger.debug("Failed extracting context as EventBridge to SQS.") + try: first_record = event.get("Records")[0] @@ -283,6 +290,30 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): return extract_context_from_lambda_context(lambda_context) +def _extract_context_from_eventbridge_sqs_event(event): + """ + Extracts Datadog trace context from an SQS event triggered by + EventBridge. + + This is only possible if first record in `Records` contains a + `body` field which contains the EventBridge `detail` as a JSON string. + """ + try: + first_record = event.get("Records")[0] + if "body" in first_record: + body_str = first_record.get("body", {}) + body = json.loads(body_str) + + detail = body.get("detail") + dd_context = detail.get("_datadog") + trace_id = dd_context.get(TraceHeader.TRACE_ID) + parent_id = dd_context.get(TraceHeader.PARENT_ID) + sampling_priority = dd_context.get(TraceHeader.SAMPLING_PRIORITY) + return trace_id, parent_id, sampling_priority + except Exception as e: + raise + + def extract_context_from_eventbridge_event(event, lambda_context): """ Extract datadog trace context from an EventBridge message's Details. @@ -995,21 +1026,33 @@ def create_inferred_span_from_sqs_event(event, context): } start_time = int(request_time_epoch) / 1000 - # logic to deal with SNS => SQS event - sns_span = None + upstream_span = None if "body" in event_record: body_str = event_record.get("body", {}) try: body = json.loads(body_str) + + # logic to deal with SNS => SQS event if body.get("Type", "") == "Notification" and "TopicArn" in body: logger.debug("Found SNS message inside SQS event") - sns_span = create_inferred_span_from_sns_event( + upstream_span = create_inferred_span_from_sns_event( create_sns_event(body), context ) - sns_span.finish(finish_time=start_time) + upstream_span.finish(finish_time=start_time) + + # EventBridge => SQS + elif body.get("detail"): + detail = body.get("detail") + if detail.get("_datadog"): + logger.debug("Found an EventBridge message inside SQS event") + upstream_span = create_inferred_span_from_eventbridge_event( + body, context + ) + upstream_span.finish(finish_time=start_time) + except Exception as e: logger.debug( - "Unable to create SNS span from SQS message, with error %s" % e + "Unable to create upstream span from SQS message, with error %s" % e ) pass @@ -1021,8 +1064,8 @@ def create_inferred_span_from_sqs_event(event, context): if span: span.set_tags(tags) span.start = start_time - if sns_span: - span.parent_id = sns_span.span_id + if upstream_span: + span.parent_id = upstream_span.span_id return span From 1e555bbd595456d00babff986327963b72238316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 4 Aug 2023 16:58:52 -0400 Subject: [PATCH 2/7] add unit tests --- tests/event_samples/eventbridge-sqs.json | 41 ++++++++++++++++++ tests/test_tracing.py | 53 ++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 tests/event_samples/eventbridge-sqs.json diff --git a/tests/event_samples/eventbridge-sqs.json b/tests/event_samples/eventbridge-sqs.json new file mode 100644 index 00000000..adf2f138 --- /dev/null +++ b/tests/event_samples/eventbridge-sqs.json @@ -0,0 +1,41 @@ +{ + "Records": [ + { + "messageId": "e995e54f-1724-41fa-82c0-8b81821f854e", + "receiptHandle": "AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=", + "body": { + "version": "0", + "id": "af718b2a-b987-e8c0-7a2b-a188fad2661a", + "detail-type": "my.Detail", + "source": "my.Source", + "account": "425362996713", + "time": "2023-08-03T22: 49: 03Z", + "region": "us-east-1", + "resources": [], + "detail": { + "text": "Hello, world!", + "_datadog": { + "x-datadog-trace-id": "7379586022458917877", + "x-datadog-parent-id": "2644033662113726488", + "x-datadog-sampling-priority": "1", + "x-datadog-tags": "_dd.p.dm=-0", + "traceparent": "00-000000000000000066698e63821a03f5-24b17e9b6476c018-01", + "tracestate": "dd=t.dm: -0;s: 1" + } + } + }, + "attributes": { + "ApproximateReceiveCount": "1", + "AWSTraceHeader": "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0", + "SentTimestamp": "1691102943638", + "SenderId": "AIDAJXNJGGKNS7OSV23OI", + "ApproximateFirstReceiveTimestamp": "1691102943647" + }, + "messageAttributes": {}, + "md5OfBody": "93d9f0cd8886d1e000a1a0b7007bffc4", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue", + "awsRegion": "us-east-1" + } + ] +} diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 978c0f1f..1ab38eb1 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1785,6 +1785,45 @@ def test_create_inferred_span_from_eventbridge_event(self): self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self") self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async") + def test_create_inferred_span_from_eventbridge_sqs_event(self): + event_sample_name = "eventbridge-sqs" + test_file = event_samples + event_sample_name + ".json" + with open(test_file, "r") as event: + event = json.load(event) + ctx = get_mock_context() + ctx.aws_request_id = "123" + span = create_inferred_span(event, ctx) + self.assertEqual(span.get_tag("operation_name"), "aws.sqs") + self.assertEqual( + span.service, + "sqs", + ) + self.assertEqual( + span.get_tag("http.url"), + None, + ) + self.assertEqual(span.get_tag("endpoint"), None) + self.assertEqual(span.get_tag("http.method"), None) + self.assertEqual( + span.get_tag("resource_names"), + "eventbridge-sqs-queue", + ) + self.assertEqual(span.get_tag("request_id"), None) + self.assertEqual(span.get_tag("queuename"), "eventbridge-sqs-queue") + self.assertEqual( + span.get_tag("event_source_arn"), + "arn:aws:sqs:us-east-1:425362996713:eventbridge-sqs-queue", + ) + self.assertEqual( + span.get_tag("sender_id"), + "AIDAJXNJGGKNS7OSV23OI", + ) + self.assertEqual(span.start, 1691102943.638) + self.assertEqual(span.span_type, "web") + self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self") + self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async") + + def test_extract_context_from_eventbridge_event(self): event_sample_source = "eventbridge-custom" test_file = event_samples + event_sample_source + ".json" @@ -1806,6 +1845,20 @@ def test_extract_dd_trace_context_for_eventbridge(self): self.assertEqual(context["trace-id"], "12345") self.assertEqual(context["parent-id"], "67890") + + def test_extract_context_from_eventbridge_sqs_event(self): + event_sample_source = "eventbridge-sqs" + test_file = event_samples + event_sample_source + ".json" + with open(test_file, "r") as event: + event = json.load(event) + + ctx = get_mock_context() + context, _, __ = extract_dd_trace_context(event, ctx) + self.assertEqual(context["trace-id"], "7379586022458917877") + self.assertEqual(context["parent-id"], "2644033662113726488") + self.assertEqual(context["sampling-priority"], "1") + + def test_extract_context_from_sqs_event_with_string_msg_attr(self): event_sample_source = "sqs-string-msg-attribute" test_file = event_samples + event_sample_source + ".json" From a4b532a3c2b422455f8057f35e3a5e1dbedece2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 4 Aug 2023 17:22:07 -0400 Subject: [PATCH 3/7] lint and fix event --- datadog_lambda/tracing.py | 4 ++-- tests/event_samples/eventbridge-sqs.json | 22 +--------------------- tests/test_tracing.py | 2 +- 3 files changed, 4 insertions(+), 24 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index f945d8ef..4a308bf7 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -310,7 +310,7 @@ def _extract_context_from_eventbridge_sqs_event(event): parent_id = dd_context.get(TraceHeader.PARENT_ID) sampling_priority = dd_context.get(TraceHeader.SAMPLING_PRIORITY) return trace_id, parent_id, sampling_priority - except Exception as e: + except Exception: raise @@ -1049,7 +1049,7 @@ def create_inferred_span_from_sqs_event(event, context): body, context ) upstream_span.finish(finish_time=start_time) - + except Exception as e: logger.debug( "Unable to create upstream span from SQS message, with error %s" % e diff --git a/tests/event_samples/eventbridge-sqs.json b/tests/event_samples/eventbridge-sqs.json index adf2f138..606abc09 100644 --- a/tests/event_samples/eventbridge-sqs.json +++ b/tests/event_samples/eventbridge-sqs.json @@ -3,27 +3,7 @@ { "messageId": "e995e54f-1724-41fa-82c0-8b81821f854e", "receiptHandle": "AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=", - "body": { - "version": "0", - "id": "af718b2a-b987-e8c0-7a2b-a188fad2661a", - "detail-type": "my.Detail", - "source": "my.Source", - "account": "425362996713", - "time": "2023-08-03T22: 49: 03Z", - "region": "us-east-1", - "resources": [], - "detail": { - "text": "Hello, world!", - "_datadog": { - "x-datadog-trace-id": "7379586022458917877", - "x-datadog-parent-id": "2644033662113726488", - "x-datadog-sampling-priority": "1", - "x-datadog-tags": "_dd.p.dm=-0", - "traceparent": "00-000000000000000066698e63821a03f5-24b17e9b6476c018-01", - "tracestate": "dd=t.dm: -0;s: 1" - } - } - }, + "body": "{\"version\":\"0\",\"id\":\"af718b2a-b987-e8c0-7a2b-a188fad2661a\",\"detail-type\":\"my.Detail\",\"source\":\"my.Source\",\"account\":\"425362996713\",\"time\":\"2023-08-03T22:49:03Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"text\":\"Hello, world!\",\"_datadog\":{\"x-datadog-trace-id\":\"7379586022458917877\",\"x-datadog-parent-id\":\"2644033662113726488\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-tags\":\"_dd.p.dm=-0\",\"traceparent\":\"00-000000000000000066698e63821a03f5-24b17e9b6476c018-01\",\"tracestate\":\"dd=t.dm:-0;s:1\"}}}", "attributes": { "ApproximateReceiveCount": "1", "AWSTraceHeader": "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0", diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 1ab38eb1..206ee414 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1853,7 +1853,7 @@ def test_extract_context_from_eventbridge_sqs_event(self): event = json.load(event) ctx = get_mock_context() - context, _, __ = extract_dd_trace_context(event, ctx) + context, source, event_type = extract_dd_trace_context(event, ctx) self.assertEqual(context["trace-id"], "7379586022458917877") self.assertEqual(context["parent-id"], "2644033662113726488") self.assertEqual(context["sampling-priority"], "1") From 463701aee8dbe7797c15eba182ccfc3d07e24337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 4 Aug 2023 17:25:09 -0400 Subject: [PATCH 4/7] lint --- tests/test_tracing.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 206ee414..24e6dcdd 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1823,7 +1823,6 @@ def test_create_inferred_span_from_eventbridge_sqs_event(self): self.assertEqual(span.get_tag(InferredSpanInfo.TAG_SOURCE), "self") self.assertEqual(span.get_tag(InferredSpanInfo.SYNCHRONICITY), "async") - def test_extract_context_from_eventbridge_event(self): event_sample_source = "eventbridge-custom" test_file = event_samples + event_sample_source + ".json" @@ -1845,7 +1844,6 @@ def test_extract_dd_trace_context_for_eventbridge(self): self.assertEqual(context["trace-id"], "12345") self.assertEqual(context["parent-id"], "67890") - def test_extract_context_from_eventbridge_sqs_event(self): event_sample_source = "eventbridge-sqs" test_file = event_samples + event_sample_source + ".json" @@ -1858,7 +1856,6 @@ def test_extract_context_from_eventbridge_sqs_event(self): self.assertEqual(context["parent-id"], "2644033662113726488") self.assertEqual(context["sampling-priority"], "1") - def test_extract_context_from_sqs_event_with_string_msg_attr(self): event_sample_source = "sqs-string-msg-attribute" test_file = event_samples + event_sample_source + ".json" From c7ed97c6eada3b8e8c65e3565c1a16ae094a13d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 4 Aug 2023 19:51:01 -0400 Subject: [PATCH 5/7] fix unpacking --- datadog_lambda/tracing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 4a308bf7..6ea36312 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -236,7 +236,8 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): # EventBrdige => SQS try: - return _extract_context_from_eventbridge_sqs_event(event) + trace_id, parent_id, sampling_priority = _extract_context_from_eventbridge_sqs_event(event) + return trace_id, parent_id, sampling_priority except Exception: logger.debug("Failed extracting context as EventBridge to SQS.") From ec17efd14c51bcb47451ced7c2a03ea2f602c7f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 4 Aug 2023 19:57:35 -0400 Subject: [PATCH 6/7] lint --- datadog_lambda/tracing.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 6ea36312..cf2f8668 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -236,7 +236,11 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): # EventBrdige => SQS try: - trace_id, parent_id, sampling_priority = _extract_context_from_eventbridge_sqs_event(event) + ( + trace_id, + parent_id, + sampling_priority, + ) = _extract_context_from_eventbridge_sqs_event(event) return trace_id, parent_id, sampling_priority except Exception: logger.debug("Failed extracting context as EventBridge to SQS.") From 3ffcca48d0b1ecb89a36dcf8b2c9de37dc131afe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 7 Aug 2023 10:19:08 -0400 Subject: [PATCH 7/7] update docs --- datadog_lambda/tracing.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index cf2f8668..92c3e01c 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -229,12 +229,19 @@ def create_sns_event(message): def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): """ - Extract Datadog trace context from the first SQS message attributes. + Extract Datadog trace context from an SQS event. + + The extraction chain goes as follows: + EB => SQS (First records body contains EB context), or + SNS => SQS (First records body contains SNS context), or + SQS or SNS (`messageAttributes` for SQS context, + `MessageAttributes` for SNS context), else + Lambda Context. Falls back to lambda context if no trace data is found in the SQS message attributes. """ - # EventBrdige => SQS + # EventBridge => SQS try: ( trace_id,