Skip to content

feat(instrumentation): Set peer.service as the traced object name for client spans to mark the destination services #2967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,14 @@ def _wrap(
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
peer_service = to_wrap.get("object")
span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "Anthropic",
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
SpanAttributes.PEER_SERVICE: peer_service
},
)

Expand Down Expand Up @@ -682,12 +684,14 @@ async def _awrap(
return await wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
peer_service = to_wrap.get("object")
span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "Anthropic",
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
SpanAttributes.PEER_SERVICE: peer_service
},
)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,18 @@ def _wrap(
metric_params.start_time = time.time()
client = wrapped(*args, **kwargs)
client.invoke_model = _instrumented_model_invoke(
client.invoke_model, tracer, metric_params
client.invoke_model, tracer, metric_params, to_wrap
)
client.invoke_model_with_response_stream = (
_instrumented_model_invoke_with_response_stream(
client.invoke_model_with_response_stream, tracer, metric_params
client.invoke_model_with_response_stream, tracer, metric_params, to_wrap
)
)
client.converse = _instrumented_converse(
client.converse, tracer, metric_params
client.converse, tracer, metric_params, to_wrap
)
client.converse_stream = _instrumented_converse_stream(
client.converse_stream, tracer, metric_params
client.converse_stream, tracer, metric_params, to_wrap
)
return client
except Exception as e:
Expand All @@ -189,14 +189,16 @@ def _wrap(
return wrapped(*args, **kwargs)


def _instrumented_model_invoke(fn, tracer, metric_params):
def _instrumented_model_invoke(fn, tracer, metric_params, to_wrap):
@wraps(fn)
def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

with tracer.start_as_current_span(
_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT
_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT, attributes={
SpanAttributes.PEER_SERVICE: to_wrap.get("object"),
}
) as span:
response = fn(*args, **kwargs)

Expand All @@ -208,13 +210,15 @@ def with_instrumentation(*args, **kwargs):
return with_instrumentation


def _instrumented_model_invoke_with_response_stream(fn, tracer, metric_params):
def _instrumented_model_invoke_with_response_stream(fn, tracer, metric_params, to_wrap):
@wraps(fn)
def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

span = tracer.start_span(_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT)
span = tracer.start_span(_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT, attributes={
SpanAttributes.PEER_SERVICE: to_wrap.get("object"),
})
response = fn(*args, **kwargs)

if span.is_recording():
Expand All @@ -225,7 +229,7 @@ def with_instrumentation(*args, **kwargs):
return with_instrumentation


def _instrumented_converse(fn, tracer, metric_params):
def _instrumented_converse(fn, tracer, metric_params, to_wrap):
# see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse.html
# for the request/response format
Expand All @@ -235,7 +239,9 @@ def with_instrumentation(*args, **kwargs):
return fn(*args, **kwargs)

with tracer.start_as_current_span(
_BEDROCK_CONVERSE_SPAN_NAME, kind=SpanKind.CLIENT
_BEDROCK_CONVERSE_SPAN_NAME, kind=SpanKind.CLIENT, attributes={
SpanAttributes.PEER_SERVICE: to_wrap.get("object"),
}
) as span:
response = fn(*args, **kwargs)
_handle_converse(span, kwargs, response, metric_params)
Expand All @@ -245,13 +251,15 @@ def with_instrumentation(*args, **kwargs):
return with_instrumentation


def _instrumented_converse_stream(fn, tracer, metric_params):
def _instrumented_converse_stream(fn, tracer, metric_params, to_wrap):
@wraps(fn)
def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

span = tracer.start_span(_BEDROCK_CONVERSE_SPAN_NAME, kind=SpanKind.CLIENT)
span = tracer.start_span(_BEDROCK_CONVERSE_SPAN_NAME, kind=SpanKind.CLIENT, attributes={
SpanAttributes.PEER_SERVICE: to_wrap.get("object"),
})
response = fn(*args, **kwargs)
if span.is_recording():
_handle_converse_stream(span, kwargs, response, metric_params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def wrap_agent_execute_task(tracer, duration_histogram, token_histogram, wrapped
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
SpanAttributes.LLM_SYSTEM: "crewai",
SpanAttributes.PEER_SERVICE: "Agent"
}
) as span:
try:
Expand Down Expand Up @@ -140,6 +142,8 @@ def wrap_task_execute(tracer, duration_histogram, token_histogram, wrapped, inst
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.TASK.value,
SpanAttributes.LLM_SYSTEM: "crewai",
SpanAttributes.PEER_SERVICE: "Task"
}
) as span:
try:
Expand All @@ -160,6 +164,8 @@ def wrap_llm_call(tracer, duration_histogram, token_histogram, wrapped, instance
f"{llm}.llm",
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "crewai",
SpanAttributes.PEER_SERVICE: "LLM"
}
) as span:
start_time = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,14 @@ def _wrap(
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
peer_service = to_wrap.get("object")
span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "Groq",
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
SpanAttributes.PEER_SERVICE: peer_service
},
)

Expand Down Expand Up @@ -484,12 +486,14 @@ async def _awrap(
return await wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
peer_service = to_wrap.get("object")
span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "Groq",
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
SpanAttributes.PEER_SERVICE: peer_service
},
)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def _create_llm_span(
)
_set_span_attribute(span, SpanAttributes.LLM_SYSTEM, "Langchain")
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_TYPE, request_type.value)
_set_span_attribute(span, SpanAttributes.PEER_SYSTEM, name)

return span

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
if kwargs.get("service_name") == "sagemaker-runtime":
client = wrapped(*args, **kwargs)
client.invoke_endpoint = _instrumented_endpoint_invoke(
client.invoke_endpoint, tracer
client.invoke_endpoint, tracer, to_wrap
)
client.invoke_endpoint_with_response_stream = (
_instrumented_endpoint_invoke_with_response_stream(
client.invoke_endpoint_with_response_stream, tracer
client.invoke_endpoint_with_response_stream, tracer, to_wrap
)
)

Expand All @@ -89,14 +89,17 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
return wrapped(*args, **kwargs)


def _instrumented_endpoint_invoke(fn, tracer):
def _instrumented_endpoint_invoke(fn, tracer, to_wrap):
@wraps(fn)
def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

with tracer.start_as_current_span(
"sagemaker.completion", kind=SpanKind.CLIENT
"sagemaker.completion", kind=SpanKind.CLIENT, attributes={
SpanAttributes.PEER_SERVICE: to_wrap.get("object"),
SpanAttributes.LLM_SYSTEM: "SageMaker"
}
) as span:
response = fn(*args, **kwargs)

Expand All @@ -108,13 +111,16 @@ def with_instrumentation(*args, **kwargs):
return with_instrumentation


def _instrumented_endpoint_invoke_with_response_stream(fn, tracer):
def _instrumented_endpoint_invoke_with_response_stream(fn, tracer, to_wrap):
@wraps(fn)
def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

span = tracer.start_span("sagemaker.completion", kind=SpanKind.CLIENT)
span = tracer.start_span("sagemaker.completion", kind=SpanKind.CLIENT, attributes={
SpanAttributes.PEER_SERVICE: to_wrap.get("object"),
SpanAttributes.LLM_SYSTEM: "SageMaker",
})
response = fn(*args, **kwargs)

if span.is_recording():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,15 @@ def _wrap(
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
peer_service = to_wrap.get("object")

span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "Watsonx",
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
SpanAttributes.PEER_SERVICE: peer_service
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class SpanAttributes:
MCP_SESSION_INIT_OPTIONS = "mcp.session.init_options"
MCP_RESPONSE_VALUE = "mcp.response.value"

# Service
PEER_SERVICE = "peer.service"

class Events(Enum):
DB_QUERY_EMBEDDINGS = "db.query.embeddings"
Expand Down
Loading