Skip to content

Commit d5cdfc2

Browse files
committed
Move OpenTelemetry context manager to main client
That way, the deserialization will be part of the span.
1 parent dc61799 commit d5cdfc2

File tree

4 files changed

+250
-4
lines changed

4 files changed

+250
-4
lines changed

elasticsearch/_async/client/_base.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@
3939
ListApiResponse,
4040
NodeConfig,
4141
ObjectApiResponse,
42+
OpenTelemetrySpan,
4243
SniffOptions,
4344
TextApiResponse,
4445
)
4546
from elastic_transport.client_utils import DEFAULT, DefaultType
4647

48+
from ..._otel import OpenTelemetry
4749
from ..._version import __versionstr__
4850
from ...compat import warn_stacklevel
4951
from ...exceptions import (
@@ -244,6 +246,7 @@ def __init__(self, _transport: AsyncTransport) -> None:
244246
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
245247
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
246248
self._verified_elasticsearch = False
249+
self._otel = OpenTelemetry()
247250

248251
@property
249252
def transport(self) -> AsyncTransport:
@@ -259,6 +262,32 @@ async def perform_request(
259262
body: Optional[Any] = None,
260263
endpoint_id: Optional[str] = None,
261264
path_parts: Optional[Mapping[str, Any]] = None,
265+
) -> ApiResponse[Any]:
266+
with self._otel.span(
267+
method,
268+
endpoint_id=endpoint_id,
269+
path_parts=path_parts or {},
270+
) as otel_span:
271+
response = await self._perform_request(
272+
method,
273+
path,
274+
params=params,
275+
headers=headers,
276+
body=body,
277+
otel_span=otel_span,
278+
)
279+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
280+
return response
281+
282+
async def _perform_request(
283+
self,
284+
method: str,
285+
path: str,
286+
*,
287+
params: Optional[Mapping[str, Any]] = None,
288+
headers: Optional[Mapping[str, str]] = None,
289+
body: Optional[Any] = None,
290+
otel_span: OpenTelemetrySpan,
262291
) -> ApiResponse[Any]:
263292
if headers:
264293
request_headers = self._headers.copy()
@@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
294323
retry_on_status=self._retry_on_status,
295324
retry_on_timeout=self._retry_on_timeout,
296325
client_meta=self._client_meta,
297-
endpoint_id=endpoint_id,
298-
path_parts=path_parts,
326+
otel_span=otel_span,
299327
)
300328

301329
# HEAD with a 404 is returned as a normal response

elasticsearch/_otel.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import contextlib
21+
import os
22+
from typing import TYPE_CHECKING, Generator, Mapping
23+
24+
if TYPE_CHECKING:
25+
from typing import Literal
26+
27+
try:
28+
from opentelemetry import trace
29+
30+
_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
31+
except ModuleNotFoundError:
32+
_tracer = None
33+
34+
from elastic_transport import OpenTelemetrySpan
35+
36+
# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
37+
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
38+
# Describes how to handle search queries in the request body when assigned to
39+
# a span attribute.
40+
# Valid values are 'omit' and 'raw'.
41+
# Default is 'omit' as 'raw' has security implications.
42+
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
43+
DEFAULT_BODY_STRATEGY = "omit"
44+
45+
46+
class OpenTelemetry:
47+
def __init__(
48+
self,
49+
enabled: bool | None = None,
50+
tracer: trace.Tracer | None = None,
51+
# TODO import Literal at the top-level when dropping Python 3.7
52+
body_strategy: 'Literal["omit", "raw"]' | None = None,
53+
):
54+
if enabled is None:
55+
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
56+
self.tracer = tracer or _tracer
57+
self.enabled = enabled and self.tracer is not None
58+
59+
if body_strategy is not None:
60+
self.body_strategy = body_strategy
61+
else:
62+
self.body_strategy = os.environ.get(
63+
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
64+
) # type: ignore[assignment]
65+
assert self.body_strategy in ("omit", "raw")
66+
67+
@contextlib.contextmanager
68+
def span(
69+
self,
70+
method: str,
71+
*,
72+
endpoint_id: str | None,
73+
path_parts: Mapping[str, str],
74+
) -> Generator[OpenTelemetrySpan, None, None]:
75+
if not self.enabled or self.tracer is None:
76+
yield OpenTelemetrySpan(None)
77+
return
78+
79+
span_name = endpoint_id or method
80+
with self.tracer.start_as_current_span(span_name) as otel_span:
81+
otel_span.set_attribute("http.request.method", method)
82+
otel_span.set_attribute("db.system", "elasticsearch")
83+
if endpoint_id is not None:
84+
otel_span.set_attribute("db.operation", endpoint_id)
85+
for key, value in path_parts.items():
86+
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
87+
88+
yield OpenTelemetrySpan(
89+
otel_span,
90+
endpoint_id=endpoint_id,
91+
body_strategy=self.body_strategy,
92+
)

elasticsearch/_sync/client/_base.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@
3838
ListApiResponse,
3939
NodeConfig,
4040
ObjectApiResponse,
41+
OpenTelemetrySpan,
4142
SniffOptions,
4243
TextApiResponse,
4344
Transport,
4445
)
4546
from elastic_transport.client_utils import DEFAULT, DefaultType
4647

48+
from ..._otel import OpenTelemetry
4749
from ..._version import __versionstr__
4850
from ...compat import warn_stacklevel
4951
from ...exceptions import (
@@ -244,6 +246,7 @@ def __init__(self, _transport: Transport) -> None:
244246
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
245247
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
246248
self._verified_elasticsearch = False
249+
self._otel = OpenTelemetry()
247250

248251
@property
249252
def transport(self) -> Transport:
@@ -259,6 +262,32 @@ def perform_request(
259262
body: Optional[Any] = None,
260263
endpoint_id: Optional[str] = None,
261264
path_parts: Optional[Mapping[str, Any]] = None,
265+
) -> ApiResponse[Any]:
266+
with self._otel.span(
267+
method,
268+
endpoint_id=endpoint_id,
269+
path_parts=path_parts or {},
270+
) as otel_span:
271+
response = self._perform_request(
272+
method,
273+
path,
274+
params=params,
275+
headers=headers,
276+
body=body,
277+
otel_span=otel_span,
278+
)
279+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
280+
return response
281+
282+
def _perform_request(
283+
self,
284+
method: str,
285+
path: str,
286+
*,
287+
params: Optional[Mapping[str, Any]] = None,
288+
headers: Optional[Mapping[str, str]] = None,
289+
body: Optional[Any] = None,
290+
otel_span: OpenTelemetrySpan,
262291
) -> ApiResponse[Any]:
263292
if headers:
264293
request_headers = self._headers.copy()
@@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
294323
retry_on_status=self._retry_on_status,
295324
retry_on_timeout=self._retry_on_timeout,
296325
client_meta=self._client_meta,
297-
endpoint_id=endpoint_id,
298-
path_parts=path_parts,
326+
otel_span=otel_span,
299327
)
300328

301329
# HEAD with a 404 is returned as a normal response

test_elasticsearch/test_otel.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import os
19+
20+
import pytest
21+
22+
try:
23+
from opentelemetry.sdk.trace import TracerProvider, export
24+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
25+
InMemorySpanExporter,
26+
)
27+
except ModuleNotFoundError:
28+
pass
29+
30+
31+
from elasticsearch import JsonSerializer
32+
from elasticsearch._otel import ENABLED_ENV_VAR, OpenTelemetry
33+
34+
pytestmark = [
35+
pytest.mark.skipif(
36+
"TEST_WITH_OTEL" not in os.environ, reason="TEST_WITH_OTEL is not set"
37+
),
38+
pytest.mark.otel,
39+
]
40+
41+
42+
def setup_tracing():
43+
tracer_provider = TracerProvider()
44+
memory_exporter = InMemorySpanExporter()
45+
span_processor = export.SimpleSpanProcessor(memory_exporter)
46+
tracer_provider.add_span_processor(span_processor)
47+
tracer = tracer_provider.get_tracer(__name__)
48+
49+
return tracer, memory_exporter
50+
51+
52+
def test_enabled():
53+
otel = OpenTelemetry()
54+
assert otel.enabled == (os.environ.get(ENABLED_ENV_VAR, "false") != "false")
55+
56+
57+
def test_minimal_span():
58+
tracer, memory_exporter = setup_tracing()
59+
60+
otel = OpenTelemetry(enabled=True, tracer=tracer)
61+
with otel.span("GET", endpoint_id=None, path_parts={}):
62+
pass
63+
64+
spans = memory_exporter.get_finished_spans()
65+
assert len(spans) == 1
66+
assert spans[0].name == "GET"
67+
assert spans[0].attributes == {
68+
"http.request.method": "GET",
69+
"db.system": "elasticsearch",
70+
}
71+
72+
73+
def test_detailed_span():
74+
tracer, memory_exporter = setup_tracing()
75+
otel = OpenTelemetry(enabled=True, tracer=tracer)
76+
with otel.span(
77+
"GET",
78+
endpoint_id="ml.open_job",
79+
path_parts={"job_id": "my-job"},
80+
) as span:
81+
span.set_elastic_cloud_metadata(
82+
{
83+
"X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f",
84+
"X-Found-Handling-Instance": "instance-0000000001",
85+
}
86+
)
87+
88+
spans = memory_exporter.get_finished_spans()
89+
assert len(spans) == 1
90+
assert spans[0].name == "ml.open_job"
91+
assert spans[0].attributes == {
92+
"http.request.method": "GET",
93+
"db.system": "elasticsearch",
94+
"db.operation": "ml.open_job",
95+
"db.elasticsearch.path_parts.job_id": "my-job",
96+
"db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f",
97+
"db.elasticsearch.node.name": "instance-0000000001",
98+
}

0 commit comments

Comments
 (0)