diff --git a/elasticsearch/_async/client/_base.py b/elasticsearch/_async/client/_base.py index b5a0cc495..dd0b0f44e 100644 --- a/elasticsearch/_async/client/_base.py +++ b/elasticsearch/_async/client/_base.py @@ -39,11 +39,13 @@ ListApiResponse, NodeConfig, ObjectApiResponse, + OpenTelemetrySpan, SniffOptions, TextApiResponse, ) from elastic_transport.client_utils import DEFAULT, DefaultType +from ..._otel import OpenTelemetry from ..._version import __versionstr__ from ...compat import warn_stacklevel from ...exceptions import ( @@ -244,6 +246,7 @@ def __init__(self, _transport: AsyncTransport) -> None: self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT self._verified_elasticsearch = False + self._otel = OpenTelemetry() @property def transport(self) -> AsyncTransport: @@ -259,6 +262,32 @@ async def perform_request( body: Optional[Any] = None, endpoint_id: Optional[str] = None, path_parts: Optional[Mapping[str, Any]] = None, + ) -> ApiResponse[Any]: + with self._otel.span( + method, + endpoint_id=endpoint_id, + path_parts=path_parts or {}, + ) as otel_span: + response = await self._perform_request( + method, + path, + params=params, + headers=headers, + body=body, + otel_span=otel_span, + ) + otel_span.set_elastic_cloud_metadata(response.meta.headers) + return response + + async def _perform_request( + self, + method: str, + path: str, + *, + params: Optional[Mapping[str, Any]] = None, + headers: Optional[Mapping[str, str]] = None, + body: Optional[Any] = None, + otel_span: OpenTelemetrySpan, ) -> ApiResponse[Any]: if headers: request_headers = self._headers.copy() @@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None: retry_on_status=self._retry_on_status, retry_on_timeout=self._retry_on_timeout, client_meta=self._client_meta, - endpoint_id=endpoint_id, - path_parts=path_parts, + otel_span=otel_span, ) # HEAD with a 404 is returned as a normal response diff --git a/elasticsearch/_otel.py b/elasticsearch/_otel.py new file mode 100644 index 000000000..1f052b660 --- /dev/null +++ b/elasticsearch/_otel.py @@ -0,0 +1,92 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import contextlib +import os +from typing import TYPE_CHECKING, Generator, Mapping + +if TYPE_CHECKING: + from typing import Literal + +try: + from opentelemetry import trace + + _tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api") +except ModuleNotFoundError: + _tracer = None + +from elastic_transport import OpenTelemetrySpan + +# Valid values for the enabled config are 'true' and 'false'. Default is 'true'. +ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED" +# Describes how to handle search queries in the request body when assigned to +# a span attribute. +# Valid values are 'omit' and 'raw'. +# Default is 'omit' as 'raw' has security implications. +BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY" +DEFAULT_BODY_STRATEGY = "omit" + + +class OpenTelemetry: + def __init__( + self, + enabled: bool | None = None, + tracer: trace.Tracer | None = None, + # TODO import Literal at the top-level when dropping Python 3.7 + body_strategy: 'Literal["omit", "raw"]' | None = None, + ): + if enabled is None: + enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false" + self.tracer = tracer or _tracer + self.enabled = enabled and self.tracer is not None + + if body_strategy is not None: + self.body_strategy = body_strategy + else: + self.body_strategy = os.environ.get( + BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY + ) # type: ignore[assignment] + assert self.body_strategy in ("omit", "raw") + + @contextlib.contextmanager + def span( + self, + method: str, + *, + endpoint_id: str | None, + path_parts: Mapping[str, str], + ) -> Generator[OpenTelemetrySpan, None, None]: + if not self.enabled or self.tracer is None: + yield OpenTelemetrySpan(None) + return + + span_name = endpoint_id or method + with self.tracer.start_as_current_span(span_name) as otel_span: + otel_span.set_attribute("http.request.method", method) + otel_span.set_attribute("db.system", "elasticsearch") + if endpoint_id is not None: + otel_span.set_attribute("db.operation", endpoint_id) + for key, value in path_parts.items(): + otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value) + + yield OpenTelemetrySpan( + otel_span, + endpoint_id=endpoint_id, + body_strategy=self.body_strategy, + ) diff --git a/elasticsearch/_sync/client/_base.py b/elasticsearch/_sync/client/_base.py index feaaeb962..8929b1db7 100644 --- a/elasticsearch/_sync/client/_base.py +++ b/elasticsearch/_sync/client/_base.py @@ -38,12 +38,14 @@ ListApiResponse, NodeConfig, ObjectApiResponse, + OpenTelemetrySpan, SniffOptions, TextApiResponse, Transport, ) from elastic_transport.client_utils import DEFAULT, DefaultType +from ..._otel import OpenTelemetry from ..._version import __versionstr__ from ...compat import warn_stacklevel from ...exceptions import ( @@ -244,6 +246,7 @@ def __init__(self, _transport: Transport) -> None: self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT self._verified_elasticsearch = False + self._otel = OpenTelemetry() @property def transport(self) -> Transport: @@ -259,6 +262,32 @@ def perform_request( body: Optional[Any] = None, endpoint_id: Optional[str] = None, path_parts: Optional[Mapping[str, Any]] = None, + ) -> ApiResponse[Any]: + with self._otel.span( + method, + endpoint_id=endpoint_id, + path_parts=path_parts or {}, + ) as otel_span: + response = self._perform_request( + method, + path, + params=params, + headers=headers, + body=body, + otel_span=otel_span, + ) + otel_span.set_elastic_cloud_metadata(response.meta.headers) + return response + + def _perform_request( + self, + method: str, + path: str, + *, + params: Optional[Mapping[str, Any]] = None, + headers: Optional[Mapping[str, str]] = None, + body: Optional[Any] = None, + otel_span: OpenTelemetrySpan, ) -> ApiResponse[Any]: if headers: request_headers = self._headers.copy() @@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None: retry_on_status=self._retry_on_status, retry_on_timeout=self._retry_on_timeout, client_meta=self._client_meta, - endpoint_id=endpoint_id, - path_parts=path_parts, + otel_span=otel_span, ) # HEAD with a 404 is returned as a normal response diff --git a/test_elasticsearch/test_client/test_options.py b/test_elasticsearch/test_client/test_options.py index b8a9affbb..adf7a1d0d 100644 --- a/test_elasticsearch/test_client/test_options.py +++ b/test_elasticsearch/test_client/test_options.py @@ -17,6 +17,7 @@ # under the License. import pytest +from elastic_transport import OpenTelemetrySpan from elastic_transport.client_utils import DEFAULT from elasticsearch import AsyncElasticsearch, Elasticsearch @@ -137,13 +138,12 @@ def test_options_passed_to_perform_request(self): assert call.pop("retry_on_timeout") is DEFAULT assert call.pop("retry_on_status") is DEFAULT assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, } # Can be overwritten with .options() @@ -157,13 +157,12 @@ def test_options_passed_to_perform_request(self): calls = client.transport.calls call = calls[("GET", "/test")][1] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 1, "max_retries": 2, "retry_on_status": (404,), @@ -184,13 +183,12 @@ def test_options_passed_to_perform_request(self): calls = client.transport.calls call = calls[("GET", "/test")][0] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 1, "max_retries": 2, "retry_on_status": (404,), @@ -213,13 +211,12 @@ async def test_options_passed_to_async_perform_request(self): assert call.pop("retry_on_timeout") is DEFAULT assert call.pop("retry_on_status") is DEFAULT assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, } # Can be overwritten with .options() @@ -233,13 +230,12 @@ async def test_options_passed_to_async_perform_request(self): calls = client.transport.calls call = calls[("GET", "/test")][1] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 1, "max_retries": 2, "retry_on_status": (404,), @@ -260,13 +256,12 @@ async def test_options_passed_to_async_perform_request(self): calls = client.transport.calls call = calls[("GET", "/test")][0] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 1, "max_retries": 2, "retry_on_status": (404,), @@ -397,13 +392,12 @@ def test_options_timeout_parameters(self): calls = client.transport.calls call = calls[("GET", "/test")][0] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 1, "max_retries": 2, "retry_on_status": (404,), @@ -428,13 +422,12 @@ def test_options_timeout_parameters(self): calls = client.transport.calls call = calls[("GET", "/test")][0] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 2, "max_retries": 3, "retry_on_status": (400,), @@ -454,13 +447,12 @@ def test_options_timeout_parameters(self): assert call.pop("retry_on_timeout") is DEFAULT assert call.pop("retry_on_status") is DEFAULT assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, } client = Elasticsearch( @@ -477,13 +469,12 @@ def test_options_timeout_parameters(self): calls = client.transport.calls call = calls[("GET", "/test")][0] assert call.pop("client_meta") is DEFAULT + assert isinstance(call.pop("otel_span"), OpenTelemetrySpan) assert call == { "headers": { "accept": "application/vnd.elasticsearch+json; compatible-with=8", }, "body": None, - "endpoint_id": "indices.get", - "path_parts": {"index": "test"}, "request_timeout": 1, "max_retries": 2, "retry_on_status": (404,), diff --git a/test_elasticsearch/test_otel.py b/test_elasticsearch/test_otel.py new file mode 100644 index 000000000..18e932a9a --- /dev/null +++ b/test_elasticsearch/test_otel.py @@ -0,0 +1,97 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os + +import pytest + +try: + from opentelemetry.sdk.trace import TracerProvider, export + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) +except ModuleNotFoundError: + pass + + +from elasticsearch._otel import ENABLED_ENV_VAR, OpenTelemetry + +pytestmark = [ + pytest.mark.skipif( + "TEST_WITH_OTEL" not in os.environ, reason="TEST_WITH_OTEL is not set" + ), + pytest.mark.otel, +] + + +def setup_tracing(): + tracer_provider = TracerProvider() + memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + return tracer, memory_exporter + + +def test_enabled(): + otel = OpenTelemetry() + assert otel.enabled == (os.environ.get(ENABLED_ENV_VAR, "false") != "false") + + +def test_minimal_span(): + tracer, memory_exporter = setup_tracing() + + otel = OpenTelemetry(enabled=True, tracer=tracer) + with otel.span("GET", endpoint_id=None, path_parts={}): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "GET" + assert spans[0].attributes == { + "http.request.method": "GET", + "db.system": "elasticsearch", + } + + +def test_detailed_span(): + tracer, memory_exporter = setup_tracing() + otel = OpenTelemetry(enabled=True, tracer=tracer) + with otel.span( + "GET", + endpoint_id="ml.open_job", + path_parts={"job_id": "my-job"}, + ) as span: + span.set_elastic_cloud_metadata( + { + "X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f", + "X-Found-Handling-Instance": "instance-0000000001", + } + ) + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "ml.open_job" + assert spans[0].attributes == { + "http.request.method": "GET", + "db.system": "elasticsearch", + "db.operation": "ml.open_job", + "db.elasticsearch.path_parts.job_id": "my-job", + "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", + "db.elasticsearch.node.name": "instance-0000000001", + }