Skip to content

Commit 4cdff67

Browse files
[Backport 8.13] Move OpenTelemetry context manager to main client (#2496)
That way, the deserialization will be part of the span. (cherry picked from commit b9aa69e) Co-authored-by: Quentin Pradet <quentin.pradet@elastic.co>
1 parent 9eae453 commit 4cdff67

File tree

5 files changed

+260
-24
lines changed

5 files changed

+260
-24
lines changed

elasticsearch/_async/client/_base.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@
3939
ListApiResponse,
4040
NodeConfig,
4141
ObjectApiResponse,
42+
OpenTelemetrySpan,
4243
SniffOptions,
4344
TextApiResponse,
4445
)
4546
from elastic_transport.client_utils import DEFAULT, DefaultType
4647

48+
from ..._otel import OpenTelemetry
4749
from ..._version import __versionstr__
4850
from ...compat import warn_stacklevel
4951
from ...exceptions import (
@@ -244,6 +246,7 @@ def __init__(self, _transport: AsyncTransport) -> None:
244246
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
245247
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
246248
self._verified_elasticsearch = False
249+
self._otel = OpenTelemetry()
247250

248251
@property
249252
def transport(self) -> AsyncTransport:
@@ -259,6 +262,32 @@ async def perform_request(
259262
body: Optional[Any] = None,
260263
endpoint_id: Optional[str] = None,
261264
path_parts: Optional[Mapping[str, Any]] = None,
265+
) -> ApiResponse[Any]:
266+
with self._otel.span(
267+
method,
268+
endpoint_id=endpoint_id,
269+
path_parts=path_parts or {},
270+
) as otel_span:
271+
response = await self._perform_request(
272+
method,
273+
path,
274+
params=params,
275+
headers=headers,
276+
body=body,
277+
otel_span=otel_span,
278+
)
279+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
280+
return response
281+
282+
async def _perform_request(
283+
self,
284+
method: str,
285+
path: str,
286+
*,
287+
params: Optional[Mapping[str, Any]] = None,
288+
headers: Optional[Mapping[str, str]] = None,
289+
body: Optional[Any] = None,
290+
otel_span: OpenTelemetrySpan,
262291
) -> ApiResponse[Any]:
263292
if headers:
264293
request_headers = self._headers.copy()
@@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
294323
retry_on_status=self._retry_on_status,
295324
retry_on_timeout=self._retry_on_timeout,
296325
client_meta=self._client_meta,
297-
endpoint_id=endpoint_id,
298-
path_parts=path_parts,
326+
otel_span=otel_span,
299327
)
300328

301329
# HEAD with a 404 is returned as a normal response

elasticsearch/_otel.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import contextlib
21+
import os
22+
from typing import TYPE_CHECKING, Generator, Mapping
23+
24+
if TYPE_CHECKING:
25+
from typing import Literal
26+
27+
try:
28+
from opentelemetry import trace
29+
30+
_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
31+
except ModuleNotFoundError:
32+
_tracer = None
33+
34+
from elastic_transport import OpenTelemetrySpan
35+
36+
# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
37+
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
38+
# Describes how to handle search queries in the request body when assigned to
39+
# a span attribute.
40+
# Valid values are 'omit' and 'raw'.
41+
# Default is 'omit' as 'raw' has security implications.
42+
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
43+
DEFAULT_BODY_STRATEGY = "omit"
44+
45+
46+
class OpenTelemetry:
47+
def __init__(
48+
self,
49+
enabled: bool | None = None,
50+
tracer: trace.Tracer | None = None,
51+
# TODO import Literal at the top-level when dropping Python 3.7
52+
body_strategy: 'Literal["omit", "raw"]' | None = None,
53+
):
54+
if enabled is None:
55+
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
56+
self.tracer = tracer or _tracer
57+
self.enabled = enabled and self.tracer is not None
58+
59+
if body_strategy is not None:
60+
self.body_strategy = body_strategy
61+
else:
62+
self.body_strategy = os.environ.get(
63+
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
64+
) # type: ignore[assignment]
65+
assert self.body_strategy in ("omit", "raw")
66+
67+
@contextlib.contextmanager
68+
def span(
69+
self,
70+
method: str,
71+
*,
72+
endpoint_id: str | None,
73+
path_parts: Mapping[str, str],
74+
) -> Generator[OpenTelemetrySpan, None, None]:
75+
if not self.enabled or self.tracer is None:
76+
yield OpenTelemetrySpan(None)
77+
return
78+
79+
span_name = endpoint_id or method
80+
with self.tracer.start_as_current_span(span_name) as otel_span:
81+
otel_span.set_attribute("http.request.method", method)
82+
otel_span.set_attribute("db.system", "elasticsearch")
83+
if endpoint_id is not None:
84+
otel_span.set_attribute("db.operation", endpoint_id)
85+
for key, value in path_parts.items():
86+
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
87+
88+
yield OpenTelemetrySpan(
89+
otel_span,
90+
endpoint_id=endpoint_id,
91+
body_strategy=self.body_strategy,
92+
)

elasticsearch/_sync/client/_base.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@
3838
ListApiResponse,
3939
NodeConfig,
4040
ObjectApiResponse,
41+
OpenTelemetrySpan,
4142
SniffOptions,
4243
TextApiResponse,
4344
Transport,
4445
)
4546
from elastic_transport.client_utils import DEFAULT, DefaultType
4647

48+
from ..._otel import OpenTelemetry
4749
from ..._version import __versionstr__
4850
from ...compat import warn_stacklevel
4951
from ...exceptions import (
@@ -244,6 +246,7 @@ def __init__(self, _transport: Transport) -> None:
244246
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
245247
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
246248
self._verified_elasticsearch = False
249+
self._otel = OpenTelemetry()
247250

248251
@property
249252
def transport(self) -> Transport:
@@ -259,6 +262,32 @@ def perform_request(
259262
body: Optional[Any] = None,
260263
endpoint_id: Optional[str] = None,
261264
path_parts: Optional[Mapping[str, Any]] = None,
265+
) -> ApiResponse[Any]:
266+
with self._otel.span(
267+
method,
268+
endpoint_id=endpoint_id,
269+
path_parts=path_parts or {},
270+
) as otel_span:
271+
response = self._perform_request(
272+
method,
273+
path,
274+
params=params,
275+
headers=headers,
276+
body=body,
277+
otel_span=otel_span,
278+
)
279+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
280+
return response
281+
282+
def _perform_request(
283+
self,
284+
method: str,
285+
path: str,
286+
*,
287+
params: Optional[Mapping[str, Any]] = None,
288+
headers: Optional[Mapping[str, str]] = None,
289+
body: Optional[Any] = None,
290+
otel_span: OpenTelemetrySpan,
262291
) -> ApiResponse[Any]:
263292
if headers:
264293
request_headers = self._headers.copy()
@@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
294323
retry_on_status=self._retry_on_status,
295324
retry_on_timeout=self._retry_on_timeout,
296325
client_meta=self._client_meta,
297-
endpoint_id=endpoint_id,
298-
path_parts=path_parts,
326+
otel_span=otel_span,
299327
)
300328

301329
# HEAD with a 404 is returned as a normal response

test_elasticsearch/test_client/test_options.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818

1919
import pytest
20+
from elastic_transport import OpenTelemetrySpan
2021
from elastic_transport.client_utils import DEFAULT
2122

2223
from elasticsearch import AsyncElasticsearch, Elasticsearch
@@ -137,13 +138,12 @@ def test_options_passed_to_perform_request(self):
137138
assert call.pop("retry_on_timeout") is DEFAULT
138139
assert call.pop("retry_on_status") is DEFAULT
139140
assert call.pop("client_meta") is DEFAULT
141+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
140142
assert call == {
141143
"headers": {
142144
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
143145
},
144146
"body": None,
145-
"endpoint_id": "indices.get",
146-
"path_parts": {"index": "test"},
147147
}
148148

149149
# Can be overwritten with .options()
@@ -157,13 +157,12 @@ def test_options_passed_to_perform_request(self):
157157
calls = client.transport.calls
158158
call = calls[("GET", "/test")][1]
159159
assert call.pop("client_meta") is DEFAULT
160+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
160161
assert call == {
161162
"headers": {
162163
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
163164
},
164165
"body": None,
165-
"endpoint_id": "indices.get",
166-
"path_parts": {"index": "test"},
167166
"request_timeout": 1,
168167
"max_retries": 2,
169168
"retry_on_status": (404,),
@@ -184,13 +183,12 @@ def test_options_passed_to_perform_request(self):
184183
calls = client.transport.calls
185184
call = calls[("GET", "/test")][0]
186185
assert call.pop("client_meta") is DEFAULT
186+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
187187
assert call == {
188188
"headers": {
189189
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
190190
},
191191
"body": None,
192-
"endpoint_id": "indices.get",
193-
"path_parts": {"index": "test"},
194192
"request_timeout": 1,
195193
"max_retries": 2,
196194
"retry_on_status": (404,),
@@ -213,13 +211,12 @@ async def test_options_passed_to_async_perform_request(self):
213211
assert call.pop("retry_on_timeout") is DEFAULT
214212
assert call.pop("retry_on_status") is DEFAULT
215213
assert call.pop("client_meta") is DEFAULT
214+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
216215
assert call == {
217216
"headers": {
218217
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
219218
},
220219
"body": None,
221-
"endpoint_id": "indices.get",
222-
"path_parts": {"index": "test"},
223220
}
224221

225222
# Can be overwritten with .options()
@@ -233,13 +230,12 @@ async def test_options_passed_to_async_perform_request(self):
233230
calls = client.transport.calls
234231
call = calls[("GET", "/test")][1]
235232
assert call.pop("client_meta") is DEFAULT
233+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
236234
assert call == {
237235
"headers": {
238236
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
239237
},
240238
"body": None,
241-
"endpoint_id": "indices.get",
242-
"path_parts": {"index": "test"},
243239
"request_timeout": 1,
244240
"max_retries": 2,
245241
"retry_on_status": (404,),
@@ -260,13 +256,12 @@ async def test_options_passed_to_async_perform_request(self):
260256
calls = client.transport.calls
261257
call = calls[("GET", "/test")][0]
262258
assert call.pop("client_meta") is DEFAULT
259+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
263260
assert call == {
264261
"headers": {
265262
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
266263
},
267264
"body": None,
268-
"endpoint_id": "indices.get",
269-
"path_parts": {"index": "test"},
270265
"request_timeout": 1,
271266
"max_retries": 2,
272267
"retry_on_status": (404,),
@@ -397,13 +392,12 @@ def test_options_timeout_parameters(self):
397392
calls = client.transport.calls
398393
call = calls[("GET", "/test")][0]
399394
assert call.pop("client_meta") is DEFAULT
395+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
400396
assert call == {
401397
"headers": {
402398
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
403399
},
404400
"body": None,
405-
"endpoint_id": "indices.get",
406-
"path_parts": {"index": "test"},
407401
"request_timeout": 1,
408402
"max_retries": 2,
409403
"retry_on_status": (404,),
@@ -428,13 +422,12 @@ def test_options_timeout_parameters(self):
428422
calls = client.transport.calls
429423
call = calls[("GET", "/test")][0]
430424
assert call.pop("client_meta") is DEFAULT
425+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
431426
assert call == {
432427
"headers": {
433428
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
434429
},
435430
"body": None,
436-
"endpoint_id": "indices.get",
437-
"path_parts": {"index": "test"},
438431
"request_timeout": 2,
439432
"max_retries": 3,
440433
"retry_on_status": (400,),
@@ -454,13 +447,12 @@ def test_options_timeout_parameters(self):
454447
assert call.pop("retry_on_timeout") is DEFAULT
455448
assert call.pop("retry_on_status") is DEFAULT
456449
assert call.pop("client_meta") is DEFAULT
450+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
457451
assert call == {
458452
"headers": {
459453
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
460454
},
461455
"body": None,
462-
"endpoint_id": "indices.get",
463-
"path_parts": {"index": "test"},
464456
}
465457

466458
client = Elasticsearch(
@@ -477,13 +469,12 @@ def test_options_timeout_parameters(self):
477469
calls = client.transport.calls
478470
call = calls[("GET", "/test")][0]
479471
assert call.pop("client_meta") is DEFAULT
472+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
480473
assert call == {
481474
"headers": {
482475
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
483476
},
484477
"body": None,
485-
"endpoint_id": "indices.get",
486-
"path_parts": {"index": "test"},
487478
"request_timeout": 1,
488479
"max_retries": 2,
489480
"retry_on_status": (404,),

0 commit comments

Comments
 (0)