Skip to content

Commit 25d3705

Browse files
[8.0] Migrate helpers to use .options(...)
Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
1 parent bc65091 commit 25d3705

File tree

11 files changed

+473
-255
lines changed

11 files changed

+473
-255
lines changed

elasticsearch/_async/client/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def __init__(
168168
sniffer_timeout=DEFAULT,
169169
sniff_on_connection_fail=DEFAULT,
170170
http_auth=DEFAULT,
171+
maxsize=DEFAULT,
171172
# Internal use only
172173
_transport: Optional[AsyncTransport] = None,
173174
) -> None:
@@ -226,6 +227,19 @@ def __init__(
226227
)
227228
sniff_on_node_failure = sniff_on_connection_fail
228229

230+
if maxsize is not DEFAULT:
231+
if connections_per_node is not DEFAULT:
232+
raise ValueError(
233+
"Can't specify both 'maxsize' and 'connections_per_node', "
234+
"instead only specify 'connections_per_node'"
235+
)
236+
warnings.warn(
237+
"The 'maxsize' parameter is deprecated in favor of 'connections_per_node'",
238+
category=DeprecationWarning,
239+
stacklevel=2,
240+
)
241+
connections_per_node = maxsize
242+
229243
# Setting min_delay_between_sniffing=True implies sniff_before_requests=True
230244
if min_delay_between_sniffing is not DEFAULT:
231245
sniff_before_requests = True

elasticsearch/_async/client/_base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ def _default_sniffed_node_callback(
233233
class BaseClient:
234234
def __init__(self, _transport: AsyncTransport) -> None:
235235
self._transport = _transport
236+
self._client_meta: Union[DefaultType, Tuple[Tuple[str, str], ...]] = DEFAULT
236237
self._headers = HttpHeaders({"content-type": "application/json"})
237238
self._request_timeout: Union[DefaultType, Optional[float]] = DEFAULT
238239
self._ignore_status: Union[DefaultType, Collection[int]] = DEFAULT
@@ -274,6 +275,7 @@ async def _perform_request(
274275
max_retries=self._max_retries,
275276
retry_on_status=self._retry_on_status,
276277
retry_on_timeout=self._retry_on_timeout,
278+
client_meta=self._client_meta,
277279
)
278280

279281
# HEAD with a 404 is returned as a normal response
@@ -320,11 +322,12 @@ async def _perform_request(
320322
warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (
321323
warning_header,
322324
)
325+
stacklevel = warn_stacklevel()
323326
for warning_message in warning_messages:
324327
warnings.warn(
325328
warning_message,
326329
category=ElasticsearchWarning,
327-
stacklevel=warn_stacklevel(),
330+
stacklevel=stacklevel,
328331
)
329332

330333
if method == "HEAD":

elasticsearch/_async/helpers.py

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async def _process_bulk_chunk(
6666

6767
try:
6868
# send the actual request
69-
resp = await client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
69+
resp = await client.bulk(*args, body=bulk_actions, **kwargs)
7070
except TransportError as e:
7171
gen = _process_bulk_chunk_error(
7272
error=e,
@@ -163,6 +163,9 @@ async def async_streaming_bulk(
163163
:arg ignore_status: list of HTTP status code that you want to ignore
164164
"""
165165

166+
client = client.options()
167+
client._client_meta = (("h", "bp"),)
168+
166169
async def map_actions():
167170
async for item in aiter(actions):
168171
yield expand_action_callback(item)
@@ -333,35 +336,52 @@ async def async_scan(
333336
query = query.copy() if query else {}
334337
query["sort"] = "_doc"
335338

336-
# Grab options that should be propagated to every
337-
# API call within this helper instead of just 'search()'
338-
transport_kwargs = {}
339-
for key in ("headers", "api_key", "http_auth"):
340-
if key in kwargs:
341-
transport_kwargs[key] = kwargs[key]
342-
343-
# If the user is using 'scroll_kwargs' we want
344-
# to propagate there too, but to not break backwards
345-
# compatibility we'll not override anything already given.
346-
if scroll_kwargs is not None and transport_kwargs:
347-
for key, val in transport_kwargs.items():
348-
scroll_kwargs.setdefault(key, val)
339+
def pop_transport_kwargs(kw):
340+
# Grab options that should be propagated to every
341+
# API call within this helper instead of just 'search()'
342+
transport_kwargs = {}
343+
for key in ("headers", "api_key", "http_auth", "basic_auth", "bearer_auth"):
344+
try:
345+
value = kw.pop(key)
346+
if key == "http_auth":
347+
key = "basic_auth"
348+
transport_kwargs[key] = value
349+
except KeyError:
350+
pass
351+
return transport_kwargs
352+
353+
client = client.options(
354+
request_timeout=request_timeout, **pop_transport_kwargs(kwargs)
355+
)
356+
client._client_meta = (("h", "s"),)
349357

350358
# initial search
351-
resp = await client.search(
352-
body=query, scroll=scroll, size=size, request_timeout=request_timeout, **kwargs
353-
)
354-
scroll_id = resp.get("_scroll_id")
359+
search_kwargs = query.copy() if query else {}
360+
search_kwargs.update(kwargs)
361+
search_kwargs["scroll"] = scroll
362+
search_kwargs["size"] = size
355363

356364
try:
357-
while scroll_id and resp["hits"]["hits"]:
358-
for hit in resp["hits"]["hits"]:
365+
resp = await client.search(**search_kwargs)
366+
except TypeError:
367+
resp = await client.search(body=query, scroll=scroll, size=size, **kwargs)
368+
369+
scroll_id = resp.raw.get("_scroll_id")
370+
scroll_transport_kwargs = pop_transport_kwargs(scroll_kwargs)
371+
if scroll_transport_kwargs:
372+
scroll_client = client.options(**scroll_transport_kwargs)
373+
else:
374+
scroll_client = client
375+
376+
try:
377+
while scroll_id and resp.raw["hits"]["hits"]:
378+
for hit in resp.raw["hits"]["hits"]:
359379
yield hit
360380

361381
# Default to 0 if the value isn't included in the response
362-
shards_successful = resp["_shards"].get("successful", 0)
363-
shards_skipped = resp["_shards"].get("skipped", 0)
364-
shards_total = resp["_shards"].get("total", 0)
382+
shards_successful = resp.raw["_shards"].get("successful", 0)
383+
shards_skipped = resp.raw["_shards"].get("skipped", 0)
384+
shards_total = resp.raw["_shards"].get("total", 0)
365385

366386
# check if we have any errors
367387
if (shards_successful + shards_skipped) < shards_total:
@@ -382,19 +402,14 @@ async def async_scan(
382402
shards_total,
383403
),
384404
)
385-
resp = await client.scroll(
386-
body={"scroll_id": scroll_id, "scroll": scroll}, **scroll_kwargs
405+
resp = await scroll_client.scroll(
406+
scroll_id=scroll_id, scroll=scroll, **scroll_kwargs
387407
)
388-
scroll_id = resp.get("_scroll_id")
408+
scroll_id = resp.raw.get("_scroll_id")
389409

390410
finally:
391411
if scroll_id and clear_scroll:
392-
await client.clear_scroll(
393-
body={"scroll_id": [scroll_id]},
394-
**transport_kwargs,
395-
ignore=(404,),
396-
params={"__elastic_client_meta": (("h", "s"),)},
397-
)
412+
await client.options(ignore_status=404).clear_scroll(scroll_id=scroll_id)
398413

399414

400415
async def async_reindex(

elasticsearch/_sync/client/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def __init__(
168168
sniffer_timeout=DEFAULT,
169169
sniff_on_connection_fail=DEFAULT,
170170
http_auth=DEFAULT,
171+
maxsize=DEFAULT,
171172
# Internal use only
172173
_transport: Optional[Transport] = None,
173174
) -> None:
@@ -226,6 +227,19 @@ def __init__(
226227
)
227228
sniff_on_node_failure = sniff_on_connection_fail
228229

230+
if maxsize is not DEFAULT:
231+
if connections_per_node is not DEFAULT:
232+
raise ValueError(
233+
"Can't specify both 'maxsize' and 'connections_per_node', "
234+
"instead only specify 'connections_per_node'"
235+
)
236+
warnings.warn(
237+
"The 'maxsize' parameter is deprecated in favor of 'connections_per_node'",
238+
category=DeprecationWarning,
239+
stacklevel=2,
240+
)
241+
connections_per_node = maxsize
242+
229243
# Setting min_delay_between_sniffing=True implies sniff_before_requests=True
230244
if min_delay_between_sniffing is not DEFAULT:
231245
sniff_before_requests = True

elasticsearch/_sync/client/_base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ def _default_sniffed_node_callback(
233233
class BaseClient:
234234
def __init__(self, _transport: Transport) -> None:
235235
self._transport = _transport
236+
self._client_meta: Union[DefaultType, Tuple[Tuple[str, str], ...]] = DEFAULT
236237
self._headers = HttpHeaders({"content-type": "application/json"})
237238
self._request_timeout: Union[DefaultType, Optional[float]] = DEFAULT
238239
self._ignore_status: Union[DefaultType, Collection[int]] = DEFAULT
@@ -274,6 +275,7 @@ def _perform_request(
274275
max_retries=self._max_retries,
275276
retry_on_status=self._retry_on_status,
276277
retry_on_timeout=self._retry_on_timeout,
278+
client_meta=self._client_meta,
277279
)
278280

279281
# HEAD with a 404 is returned as a normal response
@@ -320,11 +322,12 @@ def _perform_request(
320322
warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (
321323
warning_header,
322324
)
325+
stacklevel = warn_stacklevel()
323326
for warning_message in warning_messages:
324327
warnings.warn(
325328
warning_message,
326329
category=ElasticsearchWarning,
327-
stacklevel=warn_stacklevel(),
330+
stacklevel=stacklevel,
328331
)
329332

330333
if method == "HEAD":

elasticsearch/_sync/client/utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ def cloud_id_to_node_configs(cloud_id: str) -> List[NodeConfig]:
181181
host=host,
182182
port=port,
183183
http_compress=True,
184-
# TODO: Set TLSv1.2+
185184
)
186185
]
187186

elasticsearch/helpers/actions.py

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error=T
169169

170170
# go through request-response pairs and detect failures
171171
for data, (op_type, item) in zip(
172-
bulk_data, map(methodcaller("popitem"), resp["items"])
172+
bulk_data, map(methodcaller("popitem"), resp.raw["items"])
173173
):
174174
status_code = item.get("status", 500)
175175

@@ -232,14 +232,12 @@ def _process_bulk_chunk(
232232
"""
233233
Send a bulk request to elasticsearch and process the output.
234234
"""
235-
kwargs = _add_helper_meta_to_kwargs(kwargs, "bp")
236-
237235
if not isinstance(ignore_status, (list, tuple)):
238236
ignore_status = (ignore_status,)
239237

240238
try:
241239
# send the actual request
242-
resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
240+
resp = client.bulk(*args, body=bulk_actions, **kwargs)
243241
except TransportError as e:
244242
gen = _process_bulk_chunk_error(
245243
error=e,
@@ -315,6 +313,9 @@ def streaming_bulk(
315313
:arg yield_ok: if set to False will skip successful documents in the output
316314
:arg ignore_status: list of HTTP status code that you want to ignore
317315
"""
316+
client = client.options()
317+
client._client_meta = (("h", "bp"),)
318+
318319
actions = map(expand_action_callback, actions)
319320
serializer = client.transport.serializers.get_serializer("application/json")
320321

@@ -542,40 +543,53 @@ def scan(
542543
543544
"""
544545
scroll_kwargs = scroll_kwargs or {}
545-
_add_helper_meta_to_kwargs(scroll_kwargs, "s")
546-
547546
if not preserve_order:
548547
query = query.copy() if query else {}
549548
query["sort"] = "_doc"
550549

551-
# Grab options that should be propagated to every
552-
# API call within this helper instead of just 'search()'
553-
transport_kwargs = {}
554-
for key in ("headers", "api_key", "http_auth"):
555-
if key in kwargs:
556-
transport_kwargs[key] = kwargs[key]
557-
558-
# If the user is using 'scroll_kwargs' we want
559-
# to propagate there too, but to not break backwards
560-
# compatibility we'll not override anything already given.
561-
if scroll_kwargs is not None and transport_kwargs:
562-
for key, val in transport_kwargs.items():
563-
scroll_kwargs.setdefault(key, val)
550+
def pop_transport_kwargs(kw):
551+
# Grab options that should be propagated to every
552+
# API call within this helper instead of just 'search()'
553+
transport_kwargs = {}
554+
for key in ("headers", "api_key", "http_auth", "basic_auth", "bearer_auth"):
555+
try:
556+
value = kw.pop(key)
557+
if key == "http_auth":
558+
key = "basic_auth"
559+
transport_kwargs[key] = value
560+
except KeyError:
561+
pass
562+
return transport_kwargs
563+
564+
client = client.options(
565+
request_timeout=request_timeout, **pop_transport_kwargs(kwargs)
566+
)
567+
client._client_meta = (("h", "s"),)
564568

565569
# initial search
566-
resp = client.search(
567-
body=query, scroll=scroll, size=size, request_timeout=request_timeout, **kwargs
568-
)
569-
scroll_id = resp.get("_scroll_id")
570+
search_kwargs = query.copy() if query else {}
571+
search_kwargs.update(kwargs)
572+
search_kwargs["scroll"] = scroll
573+
search_kwargs["size"] = size
574+
try:
575+
resp = client.search(**search_kwargs)
576+
except TypeError:
577+
resp = client.search(body=query, scroll=scroll, size=size, **kwargs)
578+
scroll_id = resp.raw.get("_scroll_id")
579+
scroll_transport_kwargs = pop_transport_kwargs(scroll_kwargs)
580+
if scroll_transport_kwargs:
581+
scroll_client = client.options(**scroll_transport_kwargs)
582+
else:
583+
scroll_client = client
570584

571585
try:
572-
while scroll_id and resp["hits"]["hits"]:
573-
yield from resp["hits"]["hits"]
586+
while scroll_id and resp.raw["hits"]["hits"]:
587+
yield from resp.raw["hits"]["hits"]
574588

575589
# Default to 0 if the value isn't included in the response
576-
shards_successful = resp["_shards"].get("successful", 0)
577-
shards_skipped = resp["_shards"].get("skipped", 0)
578-
shards_total = resp["_shards"].get("total", 0)
590+
shards_successful = resp.raw["_shards"].get("successful", 0)
591+
shards_skipped = resp.raw["_shards"].get("skipped", 0)
592+
shards_total = resp.raw["_shards"].get("total", 0)
579593

580594
# check if we have any errors
581595
if (shards_successful + shards_skipped) < shards_total:
@@ -596,19 +610,14 @@ def scan(
596610
shards_total,
597611
),
598612
)
599-
resp = client.scroll(
600-
body={"scroll_id": scroll_id, "scroll": scroll}, **scroll_kwargs
613+
resp = scroll_client.scroll(
614+
scroll_id=scroll_id, scroll=scroll, **scroll_kwargs
601615
)
602-
scroll_id = resp.get("_scroll_id")
616+
scroll_id = resp.raw.get("_scroll_id")
603617

604618
finally:
605619
if scroll_id and clear_scroll:
606-
client.clear_scroll(
607-
body={"scroll_id": [scroll_id]},
608-
ignore=(404,),
609-
params={"__elastic_client_meta": (("h", "s"),)},
610-
**transport_kwargs,
611-
)
620+
client.options(ignore_status=404).clear_scroll(scroll_id=scroll_id)
612621

613622

614623
def reindex(

elasticsearch/helpers/errors.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,13 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from typing import Any, List
18+
from typing import Any
1919

2020
from ..exceptions import ElasticsearchException
2121

2222

2323
class BulkIndexError(ElasticsearchException):
24-
@property
25-
def errors(self) -> List[Any]: # type: ignore
26-
"""List of errors from execution of the last chunk."""
27-
return self.args[1] # type: ignore
24+
pass
2825

2926

3027
class ScanError(ElasticsearchException):

0 commit comments

Comments
 (0)