Skip to content

Commit 87133b1

Browse files
author
Alisson Claudino
committed
fix: forward OTEL context to subthreads in parallel bulk calls
1 parent 3073f9c commit 87133b1

File tree

3 files changed

+45
-23
lines changed

3 files changed

+45
-23
lines changed

elasticsearch/_otel.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
from typing import Literal
2626

2727
try:
28+
from opentelemetry import context as otel_context
2829
from opentelemetry import trace
30+
from opentelemetry.trace.propagation.tracecontext import (
31+
TraceContextTextMapPropagator,
32+
)
2933

3034
_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
3135
except ModuleNotFoundError:
@@ -44,6 +48,8 @@
4448

4549

4650
class OpenTelemetry:
51+
current_context = None
52+
4753
def __init__(
4854
self,
4955
enabled: bool | None = None,
@@ -78,6 +84,8 @@ def span(
7884

7985
span_name = endpoint_id or method
8086
with self.tracer.start_as_current_span(span_name) as otel_span:
87+
self.current_context = {}
88+
TraceContextTextMapPropagator().inject(self.current_context)
8189
otel_span.set_attribute("http.request.method", method)
8290
otel_span.set_attribute("db.system", "elasticsearch")
8391
if endpoint_id is not None:
@@ -90,3 +98,16 @@ def span(
9098
endpoint_id=endpoint_id,
9199
body_strategy=self.body_strategy,
92100
)
101+
102+
@contextlib.contextmanager
103+
def recover_parent_context(self):
104+
token = None
105+
if self.current_context:
106+
otel_parent_ctx = TraceContextTextMapPropagator().extract(
107+
carrier=self.current_context
108+
)
109+
token = otel_context.attach(otel_parent_ctx)
110+
yield
111+
112+
if token:
113+
otel_context.detach(token)

elasticsearch/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
__versionstr__ = "8.14.0"
18+
__versionstr__ = "8.14.1-alpha"

elasticsearch/helpers/actions.py

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -331,28 +331,29 @@ def _process_bulk_chunk(
331331
"""
332332
Send a bulk request to elasticsearch and process the output.
333333
"""
334-
if isinstance(ignore_status, int):
335-
ignore_status = (ignore_status,)
336-
337-
try:
338-
# send the actual request
339-
resp = client.bulk(*args, operations=bulk_actions, **kwargs) # type: ignore[arg-type]
340-
except ApiError as e:
341-
gen = _process_bulk_chunk_error(
342-
error=e,
343-
bulk_data=bulk_data,
344-
ignore_status=ignore_status,
345-
raise_on_exception=raise_on_exception,
346-
raise_on_error=raise_on_error,
347-
)
348-
else:
349-
gen = _process_bulk_chunk_success(
350-
resp=resp.body,
351-
bulk_data=bulk_data,
352-
ignore_status=ignore_status,
353-
raise_on_error=raise_on_error,
354-
)
355-
yield from gen
334+
with client._otel.recover_parent_context():
335+
if isinstance(ignore_status, int):
336+
ignore_status = (ignore_status,)
337+
338+
try:
339+
# send the actual request
340+
resp = client.bulk(*args, operations=bulk_actions, **kwargs) # type: ignore[arg-type]
341+
except ApiError as e:
342+
gen = _process_bulk_chunk_error(
343+
error=e,
344+
bulk_data=bulk_data,
345+
ignore_status=ignore_status,
346+
raise_on_exception=raise_on_exception,
347+
raise_on_error=raise_on_error,
348+
)
349+
else:
350+
gen = _process_bulk_chunk_success(
351+
resp=resp.body,
352+
bulk_data=bulk_data,
353+
ignore_status=ignore_status,
354+
raise_on_error=raise_on_error,
355+
)
356+
yield from gen
356357

357358

358359
def streaming_bulk(

0 commit comments

Comments
 (0)