Skip to content

Move OpenTelemetry context manager to main client #2490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions elasticsearch/_async/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
ListApiResponse,
NodeConfig,
ObjectApiResponse,
OpenTelemetrySpan,
SniffOptions,
TextApiResponse,
)
from elastic_transport.client_utils import DEFAULT, DefaultType

from ..._otel import OpenTelemetry
from ..._version import __versionstr__
from ...compat import warn_stacklevel
from ...exceptions import (
Expand Down Expand Up @@ -244,6 +246,7 @@ def __init__(self, _transport: AsyncTransport) -> None:
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
self._verified_elasticsearch = False
self._otel = OpenTelemetry()

@property
def transport(self) -> AsyncTransport:
Expand All @@ -259,6 +262,32 @@ async def perform_request(
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
with self._otel.span(
method,
endpoint_id=endpoint_id,
path_parts=path_parts or {},
) as otel_span:
response = await self._perform_request(
method,
path,
params=params,
headers=headers,
body=body,
otel_span=otel_span,
)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

async def _perform_request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
otel_span: OpenTelemetrySpan,
) -> ApiResponse[Any]:
if headers:
request_headers = self._headers.copy()
Expand Down Expand Up @@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
retry_on_status=self._retry_on_status,
retry_on_timeout=self._retry_on_timeout,
client_meta=self._client_meta,
endpoint_id=endpoint_id,
path_parts=path_parts,
otel_span=otel_span,
)

# HEAD with a 404 is returned as a normal response
Expand Down
92 changes: 92 additions & 0 deletions elasticsearch/_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import contextlib
import os
from typing import TYPE_CHECKING, Generator, Mapping

if TYPE_CHECKING:
from typing import Literal

try:
from opentelemetry import trace

_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
except ModuleNotFoundError:
_tracer = None

from elastic_transport import OpenTelemetrySpan

# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
# Describes how to handle search queries in the request body when assigned to
# a span attribute.
# Valid values are 'omit' and 'raw'.
# Default is 'omit' as 'raw' has security implications.
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
DEFAULT_BODY_STRATEGY = "omit"


class OpenTelemetry:
def __init__(
self,
enabled: bool | None = None,
tracer: trace.Tracer | None = None,
# TODO import Literal at the top-level when dropping Python 3.7
body_strategy: 'Literal["omit", "raw"]' | None = None,
):
if enabled is None:
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
self.tracer = tracer or _tracer
self.enabled = enabled and self.tracer is not None

if body_strategy is not None:
self.body_strategy = body_strategy
else:
self.body_strategy = os.environ.get(
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
) # type: ignore[assignment]
assert self.body_strategy in ("omit", "raw")

@contextlib.contextmanager
def span(
self,
method: str,
*,
endpoint_id: str | None,
path_parts: Mapping[str, str],
) -> Generator[OpenTelemetrySpan, None, None]:
if not self.enabled or self.tracer is None:
yield OpenTelemetrySpan(None)
return

span_name = endpoint_id or method
with self.tracer.start_as_current_span(span_name) as otel_span:
otel_span.set_attribute("http.request.method", method)
otel_span.set_attribute("db.system", "elasticsearch")
if endpoint_id is not None:
otel_span.set_attribute("db.operation", endpoint_id)
for key, value in path_parts.items():
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)

yield OpenTelemetrySpan(
otel_span,
endpoint_id=endpoint_id,
body_strategy=self.body_strategy,
)
32 changes: 30 additions & 2 deletions elasticsearch/_sync/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
ListApiResponse,
NodeConfig,
ObjectApiResponse,
OpenTelemetrySpan,
SniffOptions,
TextApiResponse,
Transport,
)
from elastic_transport.client_utils import DEFAULT, DefaultType

from ..._otel import OpenTelemetry
from ..._version import __versionstr__
from ...compat import warn_stacklevel
from ...exceptions import (
Expand Down Expand Up @@ -244,6 +246,7 @@ def __init__(self, _transport: Transport) -> None:
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
self._verified_elasticsearch = False
self._otel = OpenTelemetry()

@property
def transport(self) -> Transport:
Expand All @@ -259,6 +262,32 @@ def perform_request(
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
with self._otel.span(
method,
endpoint_id=endpoint_id,
path_parts=path_parts or {},
) as otel_span:
response = self._perform_request(
method,
path,
params=params,
headers=headers,
body=body,
otel_span=otel_span,
)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

def _perform_request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
otel_span: OpenTelemetrySpan,
) -> ApiResponse[Any]:
if headers:
request_headers = self._headers.copy()
Expand Down Expand Up @@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
retry_on_status=self._retry_on_status,
retry_on_timeout=self._retry_on_timeout,
client_meta=self._client_meta,
endpoint_id=endpoint_id,
path_parts=path_parts,
otel_span=otel_span,
)

# HEAD with a 404 is returned as a normal response
Expand Down
31 changes: 11 additions & 20 deletions test_elasticsearch/test_client/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import pytest
from elastic_transport import OpenTelemetrySpan
from elastic_transport.client_utils import DEFAULT

from elasticsearch import AsyncElasticsearch, Elasticsearch
Expand Down Expand Up @@ -137,13 +138,12 @@ def test_options_passed_to_perform_request(self):
assert call.pop("retry_on_timeout") is DEFAULT
assert call.pop("retry_on_status") is DEFAULT
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
}

# Can be overwritten with .options()
Expand All @@ -157,13 +157,12 @@ def test_options_passed_to_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][1]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -184,13 +183,12 @@ def test_options_passed_to_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -213,13 +211,12 @@ async def test_options_passed_to_async_perform_request(self):
assert call.pop("retry_on_timeout") is DEFAULT
assert call.pop("retry_on_status") is DEFAULT
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
}

# Can be overwritten with .options()
Expand All @@ -233,13 +230,12 @@ async def test_options_passed_to_async_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][1]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -260,13 +256,12 @@ async def test_options_passed_to_async_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand Down Expand Up @@ -397,13 +392,12 @@ def test_options_timeout_parameters(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -428,13 +422,12 @@ def test_options_timeout_parameters(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 2,
"max_retries": 3,
"retry_on_status": (400,),
Expand All @@ -454,13 +447,12 @@ def test_options_timeout_parameters(self):
assert call.pop("retry_on_timeout") is DEFAULT
assert call.pop("retry_on_status") is DEFAULT
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
}

client = Elasticsearch(
Expand All @@ -477,13 +469,12 @@ def test_options_timeout_parameters(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand Down
Loading