From 7b54d49a58ded4bf0746393e994d5127a768a815 Mon Sep 17 00:00:00 2001 From: David Avikasis Date: Thu, 29 Aug 2019 01:31:07 +0300 Subject: [PATCH] Allow retries for statuses other than 429 in streaming_bulk --- elasticsearch/helpers/actions.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index a32fefdba..d7767295f 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -158,6 +158,10 @@ def _process_bulk_chunk( raise BulkIndexError("%i document(s) failed to index." % len(errors), errors) +def _retry_for_status(status): + if status == 429: return True + return False + def streaming_bulk( client, actions, @@ -165,6 +169,7 @@ def streaming_bulk( max_chunk_bytes=100 * 1024 * 1024, raise_on_error=True, expand_action_callback=expand_action, + retry_for_status_callback=_retry_for_status, raise_on_exception=True, max_retries=0, initial_backoff=2, @@ -198,6 +203,9 @@ def streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). + :arg retry_for_status_callback: callback executed on each item's status, + should return a True if the status require a retry and False if not. + (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when ``429`` is received, set to 0 (default) for no retries on ``429`` :arg initial_backoff: number of seconds we should wait before the first @@ -233,12 +241,12 @@ def streaming_bulk( if not ok: action, info = info.popitem() - # retry if retries enabled, we get 429, and we are not - # in the last attempt + # retry if retries enabled, we are not in the last attempt, + # and we get 429 (or retry_for_status_callback is true) if ( max_retries - and info["status"] == 429 and (attempt + 1) <= max_retries + and retry_for_status_callback(info["status"]) ): # _process_bulk_chunk expects strings so we need to # re-serialize the data @@ -252,8 +260,8 @@ def streaming_bulk( yield ok, info except TransportError as e: - # suppress 429 errors since we will retry them - if attempt == max_retries or e.status_code != 429: + # suppress 429 errors (or any status which retry_for_status_callback is true for) since we will retry them + if attempt == max_retries or not retry_for_status_callback(e.status_code): raise else: if not to_retry: