Skip to content

Commit 86c7498

Browse files
author
Alisson Claudino
committed
fix: forward OTEL context to subthreads in parallel bulk calls
1 parent d4eb86e commit 86c7498

File tree

3 files changed

+45
-23
lines changed

3 files changed

+45
-23
lines changed

elasticsearch/_otel.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
from typing import Generator, Literal, Mapping
2323

2424
try:
25+
from opentelemetry import context as otel_context
2526
from opentelemetry import trace
27+
from opentelemetry.trace.propagation.tracecontext import (
28+
TraceContextTextMapPropagator,
29+
)
2630

2731
_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
2832
except ModuleNotFoundError:
@@ -41,6 +45,8 @@
4145

4246

4347
class OpenTelemetry:
48+
current_context = None
49+
4450
def __init__(
4551
self,
4652
enabled: bool | None = None,
@@ -74,6 +80,8 @@ def span(
7480

7581
span_name = endpoint_id or method
7682
with self.tracer.start_as_current_span(span_name) as otel_span:
83+
self.current_context = {}
84+
TraceContextTextMapPropagator().inject(self.current_context)
7785
otel_span.set_attribute("http.request.method", method)
7886
otel_span.set_attribute("db.system", "elasticsearch")
7987
if endpoint_id is not None:
@@ -86,3 +94,16 @@ def span(
8694
endpoint_id=endpoint_id,
8795
body_strategy=self.body_strategy,
8896
)
97+
98+
@contextlib.contextmanager
99+
def recover_parent_context(self):
100+
token = None
101+
if self.current_context:
102+
otel_parent_ctx = TraceContextTextMapPropagator().extract(
103+
carrier=self.current_context
104+
)
105+
token = otel_context.attach(otel_parent_ctx)
106+
yield
107+
108+
if token:
109+
otel_context.detach(token)

elasticsearch/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
__versionstr__ = "8.14.0"
18+
__versionstr__ = "8.14.1-alpha"

elasticsearch/helpers/actions.py

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -331,28 +331,29 @@ def _process_bulk_chunk(
331331
"""
332332
Send a bulk request to elasticsearch and process the output.
333333
"""
334-
if isinstance(ignore_status, int):
335-
ignore_status = (ignore_status,)
336-
337-
try:
338-
# send the actual request
339-
resp = client.bulk(*args, operations=bulk_actions, **kwargs) # type: ignore[arg-type]
340-
except ApiError as e:
341-
gen = _process_bulk_chunk_error(
342-
error=e,
343-
bulk_data=bulk_data,
344-
ignore_status=ignore_status,
345-
raise_on_exception=raise_on_exception,
346-
raise_on_error=raise_on_error,
347-
)
348-
else:
349-
gen = _process_bulk_chunk_success(
350-
resp=resp.body,
351-
bulk_data=bulk_data,
352-
ignore_status=ignore_status,
353-
raise_on_error=raise_on_error,
354-
)
355-
yield from gen
334+
with client._otel.recover_parent_context():
335+
if isinstance(ignore_status, int):
336+
ignore_status = (ignore_status,)
337+
338+
try:
339+
# send the actual request
340+
resp = client.bulk(*args, operations=bulk_actions, **kwargs) # type: ignore[arg-type]
341+
except ApiError as e:
342+
gen = _process_bulk_chunk_error(
343+
error=e,
344+
bulk_data=bulk_data,
345+
ignore_status=ignore_status,
346+
raise_on_exception=raise_on_exception,
347+
raise_on_error=raise_on_error,
348+
)
349+
else:
350+
gen = _process_bulk_chunk_success(
351+
resp=resp.body,
352+
bulk_data=bulk_data,
353+
ignore_status=ignore_status,
354+
raise_on_error=raise_on_error,
355+
)
356+
yield from gen
356357

357358

358359
def streaming_bulk(

0 commit comments

Comments
 (0)