Skip to content

Commit c4dda0d

Browse files
committed
Record OpenTelemetry spans
1 parent 8785889 commit c4dda0d

File tree

5 files changed

+260
-0
lines changed

5 files changed

+260
-0
lines changed

elasticsearch_serverless/_async/client/_base.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
HttpHeaders,
2828
ListApiResponse,
2929
ObjectApiResponse,
30+
OpenTelemetrySpan,
3031
TextApiResponse,
3132
)
3233
from elastic_transport.client_utils import DEFAULT, DefaultType
3334

35+
from ..._otel import OpenTelemetry
3436
from ...compat import warn_stacklevel
3537
from ...exceptions import (
3638
HTTP_EXCEPTIONS,
@@ -125,6 +127,7 @@ def __init__(self, _transport: AsyncTransport) -> None:
125127
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
126128
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
127129
self._verified_elasticsearch = False
130+
self._otel = OpenTelemetry()
128131

129132
@property
130133
def transport(self) -> AsyncTransport:
@@ -140,6 +143,32 @@ async def perform_request(
140143
body: Optional[Any] = None,
141144
endpoint_id: Optional[str] = None,
142145
path_parts: Optional[Mapping[str, Any]] = None,
146+
) -> ApiResponse[Any]:
147+
with self._otel.span(
148+
method,
149+
endpoint_id=endpoint_id,
150+
path_parts=path_parts or {},
151+
) as otel_span:
152+
response = await self._perform_request(
153+
method,
154+
path,
155+
params=params,
156+
headers=headers,
157+
body=body,
158+
otel_span=otel_span,
159+
)
160+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
161+
return response
162+
163+
async def _perform_request(
164+
self,
165+
method: str,
166+
path: str,
167+
*,
168+
params: Optional[Mapping[str, Any]] = None,
169+
headers: Optional[Mapping[str, str]] = None,
170+
body: Optional[Any] = None,
171+
otel_span: OpenTelemetrySpan,
143172
) -> ApiResponse[Any]:
144173
if headers:
145174
request_headers = self._headers.copy()
@@ -162,6 +191,7 @@ async def perform_request(
162191
retry_on_status=self._retry_on_status,
163192
retry_on_timeout=self._retry_on_timeout,
164193
client_meta=self._client_meta,
194+
otel_span=otel_span,
165195
)
166196

167197
# HEAD with a 404 is returned as a normal response

elasticsearch_serverless/_otel.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import contextlib
21+
import os
22+
from typing import TYPE_CHECKING, Generator, Mapping
23+
24+
if TYPE_CHECKING:
25+
from typing import Literal
26+
27+
try:
28+
from opentelemetry import trace
29+
30+
_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
31+
except ModuleNotFoundError:
32+
_tracer = None
33+
34+
from elastic_transport import OpenTelemetrySpan
35+
36+
# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
37+
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
38+
# Describes how to handle search queries in the request body when assigned to
39+
# a span attribute.
40+
# Valid values are 'omit' and 'raw'.
41+
# Default is 'omit' as 'raw' has security implications.
42+
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
43+
DEFAULT_BODY_STRATEGY = "omit"
44+
45+
46+
class OpenTelemetry:
47+
def __init__(
48+
self,
49+
enabled: bool | None = None,
50+
tracer: trace.Tracer | None = None,
51+
# TODO import Literal at the top-level when dropping Python 3.7
52+
body_strategy: 'Literal["omit", "raw"]' | None = None,
53+
):
54+
if enabled is None:
55+
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
56+
self.tracer = tracer or _tracer
57+
self.enabled = enabled and self.tracer is not None
58+
59+
if body_strategy is not None:
60+
self.body_strategy = body_strategy
61+
else:
62+
self.body_strategy = os.environ.get(
63+
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
64+
) # type: ignore[assignment]
65+
assert self.body_strategy in ("omit", "raw")
66+
67+
@contextlib.contextmanager
68+
def span(
69+
self,
70+
method: str,
71+
*,
72+
endpoint_id: str | None,
73+
path_parts: Mapping[str, str],
74+
) -> Generator[OpenTelemetrySpan, None, None]:
75+
if not self.enabled or self.tracer is None:
76+
yield OpenTelemetrySpan(None)
77+
return
78+
79+
span_name = endpoint_id or method
80+
with self.tracer.start_as_current_span(span_name) as otel_span:
81+
otel_span.set_attribute("http.request.method", method)
82+
otel_span.set_attribute("db.system", "elasticsearch")
83+
if endpoint_id is not None:
84+
otel_span.set_attribute("db.operation", endpoint_id)
85+
for key, value in path_parts.items():
86+
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
87+
88+
yield OpenTelemetrySpan(
89+
otel_span,
90+
endpoint_id=endpoint_id,
91+
body_strategy=self.body_strategy,
92+
)

elasticsearch_serverless/_sync/client/_base.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
HttpHeaders,
2727
ListApiResponse,
2828
ObjectApiResponse,
29+
OpenTelemetrySpan,
2930
TextApiResponse,
3031
Transport,
3132
)
3233
from elastic_transport.client_utils import DEFAULT, DefaultType
3334

35+
from ..._otel import OpenTelemetry
3436
from ...compat import warn_stacklevel
3537
from ...exceptions import (
3638
HTTP_EXCEPTIONS,
@@ -125,6 +127,7 @@ def __init__(self, _transport: Transport) -> None:
125127
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
126128
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
127129
self._verified_elasticsearch = False
130+
self._otel = OpenTelemetry()
128131

129132
@property
130133
def transport(self) -> Transport:
@@ -140,6 +143,32 @@ def perform_request(
140143
body: Optional[Any] = None,
141144
endpoint_id: Optional[str] = None,
142145
path_parts: Optional[Mapping[str, Any]] = None,
146+
) -> ApiResponse[Any]:
147+
with self._otel.span(
148+
method,
149+
endpoint_id=endpoint_id,
150+
path_parts=path_parts or {},
151+
) as otel_span:
152+
response = self._perform_request(
153+
method,
154+
path,
155+
params=params,
156+
headers=headers,
157+
body=body,
158+
otel_span=otel_span,
159+
)
160+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
161+
return response
162+
163+
def _perform_request(
164+
self,
165+
method: str,
166+
path: str,
167+
*,
168+
params: Optional[Mapping[str, Any]] = None,
169+
headers: Optional[Mapping[str, str]] = None,
170+
body: Optional[Any] = None,
171+
otel_span: OpenTelemetrySpan,
143172
) -> ApiResponse[Any]:
144173
if headers:
145174
request_headers = self._headers.copy()
@@ -162,6 +191,7 @@ def perform_request(
162191
retry_on_status=self._retry_on_status,
163192
retry_on_timeout=self._retry_on_timeout,
164193
client_meta=self._client_meta,
194+
otel_span=otel_span,
165195
)
166196

167197
# HEAD with a 404 is returned as a normal response

test_elasticsearch_serverless/test_client/test_options.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818

1919
import pytest
20+
from elastic_transport import OpenTelemetrySpan
2021
from elastic_transport.client_utils import DEFAULT
2122

2223
from elasticsearch_serverless import AsyncElasticsearch, Elasticsearch
@@ -137,6 +138,7 @@ def test_options_passed_to_perform_request(self):
137138
assert call.pop("retry_on_timeout") is DEFAULT
138139
assert call.pop("retry_on_status") is DEFAULT
139140
assert call.pop("client_meta") is DEFAULT
141+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
140142
assert call == {
141143
"headers": {
142144
"accept": "application/json",
@@ -155,6 +157,7 @@ def test_options_passed_to_perform_request(self):
155157
calls = client.transport.calls
156158
call = calls[("GET", "/test")][1]
157159
assert call.pop("client_meta") is DEFAULT
160+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
158161
assert call == {
159162
"headers": {
160163
"accept": "application/json",
@@ -180,6 +183,7 @@ def test_options_passed_to_perform_request(self):
180183
calls = client.transport.calls
181184
call = calls[("GET", "/test")][0]
182185
assert call.pop("client_meta") is DEFAULT
186+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
183187
assert call == {
184188
"headers": {
185189
"accept": "application/json",
@@ -207,6 +211,7 @@ async def test_options_passed_to_async_perform_request(self):
207211
assert call.pop("retry_on_timeout") is DEFAULT
208212
assert call.pop("retry_on_status") is DEFAULT
209213
assert call.pop("client_meta") is DEFAULT
214+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
210215
assert call == {
211216
"headers": {
212217
"accept": "application/json",
@@ -225,6 +230,7 @@ async def test_options_passed_to_async_perform_request(self):
225230
calls = client.transport.calls
226231
call = calls[("GET", "/test")][1]
227232
assert call.pop("client_meta") is DEFAULT
233+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
228234
assert call == {
229235
"headers": {
230236
"accept": "application/json",
@@ -250,6 +256,7 @@ async def test_options_passed_to_async_perform_request(self):
250256
calls = client.transport.calls
251257
call = calls[("GET", "/test")][0]
252258
assert call.pop("client_meta") is DEFAULT
259+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
253260
assert call == {
254261
"headers": {
255262
"accept": "application/json",
@@ -389,6 +396,7 @@ def test_options_timeout_parameters(self):
389396
calls = client.transport.calls
390397
call = calls[("GET", "/test")][0]
391398
assert call.pop("client_meta") is DEFAULT
399+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
392400
assert call == {
393401
"headers": {
394402
"accept": "application/json",
@@ -418,6 +426,7 @@ def test_options_timeout_parameters(self):
418426
calls = client.transport.calls
419427
call = calls[("GET", "/test")][0]
420428
assert call.pop("client_meta") is DEFAULT
429+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
421430
assert call == {
422431
"headers": {
423432
"accept": "application/json",
@@ -442,6 +451,7 @@ def test_options_timeout_parameters(self):
442451
assert call.pop("retry_on_timeout") is DEFAULT
443452
assert call.pop("retry_on_status") is DEFAULT
444453
assert call.pop("client_meta") is DEFAULT
454+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
445455
assert call == {
446456
"headers": {
447457
"accept": "application/json",
@@ -463,6 +473,7 @@ def test_options_timeout_parameters(self):
463473
calls = client.transport.calls
464474
call = calls[("GET", "/test")][0]
465475
assert call.pop("client_meta") is DEFAULT
476+
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
466477
assert call == {
467478
"headers": {
468479
"accept": "application/json",

0 commit comments

Comments
 (0)