35
35
from ddtrace .propagation .http import HTTPPropagator
36
36
from datadog_lambda import __version__ as datadog_lambda_version
37
37
from datadog_lambda .trigger import (
38
+ _EventSource ,
38
39
parse_event_source ,
39
40
get_first_record ,
40
41
EventTypes ,
@@ -169,16 +170,21 @@ def extract_context_from_lambda_context(lambda_context):
169
170
return trace_id , parent_id , sampling_priority
170
171
171
172
172
- def extract_context_from_http_event_or_context (event , lambda_context ):
173
+ def extract_context_from_http_event_or_context (event , lambda_context , event_source : _EventSource ):
173
174
"""
174
175
Extract Datadog trace context from the `headers` key in from the Lambda
175
176
`event` object.
176
177
177
178
Falls back to lambda context if no trace data is found in the `headers`
178
179
"""
179
- if is_non_cached_authorizer_invocation (event ):
180
+ injected_authorizer_data = get_injected_authorizer_data (event , event_source )
181
+ if injected_authorizer_data :
180
182
try :
181
- return extract_context_from_authorizer_event (event )
183
+ # fail fast on any KeyError here
184
+ trace_id = injected_authorizer_data [TraceHeader .TRACE_ID ]
185
+ parent_id = injected_authorizer_data [TraceHeader .PARENT_ID ]
186
+ sampling_priority = injected_authorizer_data [TraceHeader .SAMPLING_PRIORITY ]
187
+ return trace_id , parent_id , sampling_priority
182
188
except Exception as e :
183
189
logger .debug (
184
190
"extract_context_from_authorizer_event returned with error. \
@@ -328,28 +334,31 @@ def extract_context_custom_extractor(extractor, event, lambda_context):
328
334
return None , None , None
329
335
330
336
331
- def extract_context_from_authorizer_event (event ):
332
- # fail fast if any KeyError until '_datadog'
333
- dd_data = json .loads (event ["requestContext" ]["authorizer" ]["_datadog" ])
334
- trace_id = dd_data .get (TraceHeader .TRACE_ID )
335
- parent_id = dd_data .get (TraceHeader .PARENT_ID )
336
- sampling_priority = dd_data .get (TraceHeader .SAMPLING_PRIORITY )
337
- return trace_id , parent_id , sampling_priority
338
-
339
-
340
- def is_non_cached_authorizer_invocation (event ) -> bool :
337
+ def get_injected_authorizer_data (event , event_source : _EventSource ) -> dict :
341
338
try :
339
+ dd_data_raw = None
342
340
# has '_datadog' header and not cached because integrationLatency > 0
343
- injected_authorizer_headers = event .get ("requestContext" , {}).get (
344
- "authorizer" , {}
345
- )
346
- return (
347
- injected_authorizer_headers .get ("_datadog" )
348
- and int (injected_authorizer_headers .get ("integrationLatency" , 0 )) > 0
349
- ) # non-cached
341
+ authorizer_headers = event .get ("requestContext" , {}).get ("authorizer" , {})
342
+
343
+ # [API_GATEWAY V1]if integrationLatency == 0, it's cached and no trace for the authorizer func
344
+ if event_source .equals (
345
+ EventTypes .API_GATEWAY , subtype = EventSubtypes .API_GATEWAY
346
+ ) and int (authorizer_headers .get ("integrationLatency" , 0 )) == 0 :
347
+ return None
348
+
349
+ if event_source .equals (EventTypes .API_GATEWAY , subtype = EventSubtypes .HTTP_API ):
350
+ dd_data_raw = authorizer_headers .get ("lambda" , {}).get ("_datadog" )
351
+ else :
352
+ dd_data_raw = authorizer_headers .get ("_datadog" )
353
+
354
+ if dd_data_raw :
355
+ dd_data = json .loads (dd_data_raw )
356
+ return dd_data
357
+ return None
358
+
350
359
except Exception as e :
351
360
logger .debug ("Failed to check if invocated by an authorizer. error %s" , e )
352
- return False
361
+ return None
353
362
354
363
355
364
def extract_dd_trace_context (event , lambda_context , extractor = None ):
@@ -374,7 +383,7 @@ def extract_dd_trace_context(event, lambda_context, extractor=None):
374
383
trace_id ,
375
384
parent_id ,
376
385
sampling_priority ,
377
- ) = extract_context_from_http_event_or_context (event , lambda_context )
386
+ ) = extract_context_from_http_event_or_context (event , lambda_context , event_source )
378
387
elif event_source .equals (EventTypes .SNS ) or event_source .equals (EventTypes .SQS ):
379
388
(
380
389
trace_id ,
@@ -414,7 +423,7 @@ def extract_dd_trace_context(event, lambda_context, extractor=None):
414
423
if dd_trace_context :
415
424
trace_context_source = TraceContextSource .XRAY
416
425
logger .debug ("extracted dd trace context %s" , dd_trace_context )
417
- return dd_trace_context , trace_context_source
426
+ return dd_trace_context , trace_context_source , event_source
418
427
419
428
420
429
def get_dd_trace_context ():
@@ -537,8 +546,9 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
537
546
)
538
547
539
548
540
- def create_inferred_span (event , context ):
541
- event_source = parse_event_source (event )
549
+ def create_inferred_span (event , context , event_source : _EventSource = None ):
550
+ if event_source is None :
551
+ event_source = parse_event_source (event )
542
552
try :
543
553
if event_source .equals (
544
554
EventTypes .API_GATEWAY , subtype = EventSubtypes .API_GATEWAY
@@ -649,20 +659,6 @@ def insert_upstream_authorizer_span(
649
659
return upstream_authorizer_span
650
660
651
661
652
- def get_start_and_finish_time_for_authorizer_span (request_context ):
653
- parsed_upstream_context = json .loads (
654
- request_context .get ("authorizer" , {}).get ("_datadog" )
655
- )
656
- start_time_s = (
657
- int (parsed_upstream_context .get (OtherConsts .parentSpanFinishTimeHeader )) / 1000
658
- )
659
- finish_time_s = (
660
- int (request_context .get ("requestTimeEpoch" ))
661
- + int (request_context .get ("authorizer" , {}).get ("integrationLatency" , 0 ))
662
- ) / 1000
663
- return start_time_s , finish_time_s
664
-
665
-
666
662
def create_inferred_span_from_api_gateway_websocket_event (event , context ):
667
663
trace_ctx = tracer .current_trace_context ()
668
664
request_context = event .get ("requestContext" )
@@ -681,7 +677,7 @@ def create_inferred_span_from_api_gateway_websocket_event(event, context):
681
677
"event_type" : request_context .get ("eventType" ),
682
678
"message_direction" : request_context .get ("messageDirection" ),
683
679
}
684
- request_time_epoch = request_context .get ("requestTimeEpoch" )
680
+ request_time_epoch_s = int ( request_context .get ("requestTimeEpoch" )) / 1000
685
681
if is_api_gateway_invocation_async (event ):
686
682
InferredSpanInfo .set_tags (tags , tag_source = "self" , synchronicity = "async" )
687
683
else :
@@ -693,14 +689,18 @@ def create_inferred_span_from_api_gateway_websocket_event(event, context):
693
689
}
694
690
tracer .set_tags ({"_dd.origin" : "lambda" })
695
691
upstream_authorizer_span = None
696
- if is_non_cached_authorizer_invocation (event ):
692
+ injected_authorizer_data = get_injected_authorizer_data (event , _EventSource (EventTypes .API_GATEWAY , EventSubtypes .WEBSOCKET ))
693
+ if injected_authorizer_data :
697
694
try :
698
- start_time_s , finish_time_s = get_start_and_finish_time_for_authorizer_span (
699
- request_context
700
- )
695
+ start_time_s = int (injected_authorizer_data .get (OtherConsts .parentSpanFinishTimeHeader )) / 1000
696
+ finish_time_s = request_time_epoch_s + \
697
+ (int (request_context .get ("authorizer" , {}).get ("integrationLatency" , 0 ))
698
+ ) / 1000
701
699
upstream_authorizer_span = insert_upstream_authorizer_span (
702
700
args , tags , start_time_s , finish_time_s
703
701
)
702
+ # trace context needs to be set again as it is reset by upstream_authorizer_span.finish
703
+ tracer .context_provider .activate (trace_ctx )
704
704
except Exception as e :
705
705
traceback .print_exc ()
706
706
logger .debug (
@@ -714,7 +714,7 @@ def create_inferred_span_from_api_gateway_websocket_event(event, context):
714
714
span = tracer .trace ("aws.apigateway.websocket" , ** args )
715
715
if span :
716
716
span .set_tags (tags )
717
- span .start = request_time_epoch / 1000
717
+ span .start = finish_time_s if finish_time_s is not None else request_time_epoch_s
718
718
if upstream_authorizer_span :
719
719
span .parent_id = upstream_authorizer_span .span_id
720
720
return span
@@ -738,7 +738,7 @@ def create_inferred_span_from_api_gateway_event(event, context):
738
738
"stage" : request_context .get ("stage" ),
739
739
"request_id" : request_context .get ("requestId" ),
740
740
}
741
- request_time_epoch = int (request_context .get ("requestTimeEpoch" )) / 1000
741
+ request_time_epoch_s = int (request_context .get ("requestTimeEpoch" )) / 1000
742
742
if is_api_gateway_invocation_async (event ):
743
743
InferredSpanInfo .set_tags (tags , tag_source = "self" , synchronicity = "async" )
744
744
else :
@@ -751,14 +751,17 @@ def create_inferred_span_from_api_gateway_event(event, context):
751
751
tracer .set_tags ({"_dd.origin" : "lambda" })
752
752
upstream_authorizer_span = None
753
753
finish_time_s = None
754
- if is_non_cached_authorizer_invocation (event ):
754
+ injected_authorizer_data = get_injected_authorizer_data (event , _EventSource (EventTypes .API_GATEWAY , EventSubtypes .API_GATEWAY ))
755
+ if injected_authorizer_data :
755
756
try :
756
- start_time_s , finish_time_s = get_start_and_finish_time_for_authorizer_span (
757
- request_context
758
- )
757
+ start_time_s = int ( injected_authorizer_data . get ( OtherConsts . parentSpanFinishTimeHeader )) / 1000
758
+ finish_time_s = request_time_epoch_s + \
759
+ ( int ( request_context . get ( "authorizer" , {}). get ( "integrationLatency" , 0 ))) / 1000
759
760
upstream_authorizer_span = insert_upstream_authorizer_span (
760
761
args , tags , start_time_s , finish_time_s
761
762
)
763
+ # trace context needs to be set again as it is reset by upstream_authorizer_span.finish
764
+ tracer .context_provider .activate (trace_ctx )
762
765
except Exception as e :
763
766
traceback .print_exc ()
764
767
logger .debug (
@@ -767,13 +770,12 @@ def create_inferred_span_from_api_gateway_event(event, context):
767
770
event ,
768
771
e ,
769
772
)
770
- # trace context needs to be set again as it is reset by upstream_authorizer_span.finish
771
- tracer .context_provider .activate (trace_ctx )
773
+
772
774
span = tracer .trace ("aws.apigateway" , ** args )
773
775
if span :
774
776
span .set_tags (tags )
775
777
# start time pushed by the inserted authorizer span
776
- span .start = finish_time_s if finish_time_s is not None else request_time_epoch
778
+ span .start = finish_time_s if finish_time_s is not None else request_time_epoch_s
777
779
if upstream_authorizer_span :
778
780
span .parent_id = upstream_authorizer_span .span_id
779
781
return span
@@ -800,7 +802,7 @@ def create_inferred_span_from_http_api_event(event, context):
800
802
"apiname" : request_context .get ("apiId" ),
801
803
"stage" : request_context .get ("stage" ),
802
804
}
803
- request_time_epoch = request_context .get ("timeEpoch" )
805
+ request_time_epoch_s = int ( request_context .get ("timeEpoch" )) / 1000
804
806
if is_api_gateway_invocation_async (event ):
805
807
InferredSpanInfo .set_tags (tags , tag_source = "self" , synchronicity = "async" )
806
808
else :
@@ -812,14 +814,17 @@ def create_inferred_span_from_http_api_event(event, context):
812
814
}
813
815
tracer .set_tags ({"_dd.origin" : "lambda" })
814
816
upstream_authorizer_span = None
815
- if is_non_cached_authorizer_invocation (event ):
817
+ finish_time_s = None
818
+ injected_authorizer_data = get_injected_authorizer_data (event , _EventSource (EventTypes .API_GATEWAY , EventSubtypes .HTTP_API ))
819
+ if injected_authorizer_data :
816
820
try :
817
- start_time_s , finish_time_s = get_start_and_finish_time_for_authorizer_span (
818
- request_context
819
- )
821
+ start_time_s = int (injected_authorizer_data .get (OtherConsts .parentSpanFinishTimeHeader )) / 1000
822
+ finish_time_s = start_time_s # we don't have the integrationLatency info for the authorizer
820
823
upstream_authorizer_span = insert_upstream_authorizer_span (
821
824
args , tags , start_time_s , finish_time_s
822
825
)
826
+ # trace context needs to be set again as it is reset by upstream_authorizer_span.finish
827
+ tracer .context_provider .activate (trace_ctx )
823
828
except Exception as e :
824
829
traceback .print_exc ()
825
830
logger .debug (
@@ -828,12 +833,10 @@ def create_inferred_span_from_http_api_event(event, context):
828
833
event ,
829
834
e ,
830
835
)
831
- # trace context needs to be set again as it is reset by upstream_authorizer_span.finish
832
- tracer .context_provider .activate (trace_ctx )
833
836
span = tracer .trace ("aws.httpapi" , ** args )
834
837
if span :
835
838
span .set_tags (tags )
836
- span .start = request_time_epoch / 1e3
839
+ span .start = finish_time_s if finish_time_s is not None else request_time_epoch_s
837
840
if upstream_authorizer_span :
838
841
span .parent_id = upstream_authorizer_span .span_id
839
842
return span
0 commit comments