Skip to content

[DE-544] Adjusting allowRetry implementation #255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arango/aql.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def execute(
development to catch issues early. If set to False, warnings are
returned with the query result. There is a server configuration
option "--query.fail-on-warning" for setting the default value for
this behaviour so it does not need to be set per-query.
this behaviour, so it does not need to be set per-query.
:type fail_on_warning: bool
:param profile: Return additional profiling details in the cursor,
unless the query cache is used.
Expand Down Expand Up @@ -437,7 +437,7 @@ def execute(
def response_handler(resp: Response) -> Cursor:
if not resp.is_success:
raise AQLQueryExecuteError(resp, request)
return Cursor(self._conn, resp.body)
return Cursor(self._conn, resp.body, allow_retry=allow_retry)

return self._execute(request, response_handler)

Expand Down
42 changes: 15 additions & 27 deletions arango/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class Cursor:
:type init_data: dict
:param cursor_type: Cursor type ("cursor" or "export").
:type cursor_type: str
:param allow_retry: If set to True, the cursor will always attempt to fetch
the latest batch from server even if the previous attempt failed.
This option is only available for server versions 3.11 and above.
:type allow_retry: bool
"""

__slots__ = [
Expand All @@ -41,16 +45,19 @@ class Cursor:
"_has_more",
"_batch",
"_next_batch_id",
"_allow_retry",
]

def __init__(
self,
connection: BaseConnection,
init_data: Json,
cursor_type: str = "cursor",
allow_retry: bool = False,
) -> None:
self._conn = connection
self._type = cursor_type
self._allow_retry = allow_retry
self._batch: Deque[Any] = deque()
self._id = None
self._count: Optional[int] = None
Expand Down Expand Up @@ -103,8 +110,10 @@ def _update(self, data: Json) -> Json:

# New in 3.11
if "nextBatchId" in data:
self._next_batch_id = data["nextBatchId"]
result["next_batch_id"] = data["nextBatchId"]
# This is only available for server versions 3.11 and above.
# Currently, we are testing against 3.10.9
self._next_batch_id = data["nextBatchId"] # pragma: no cover
result["next_batch_id"] = data["nextBatchId"] # pragma: no cover

self._has_more = bool(data["hasMore"])
result["has_more"] = data["hasMore"]
Expand Down Expand Up @@ -280,33 +289,12 @@ def fetch(self) -> Json:
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}")
resp = self._conn.send_request(request)

if not resp.is_success:
raise CursorNextError(resp, request)

return self._update(resp.body)

def retry(self) -> Json:
"""Retry fetching the next batch from server and update the cursor.
endpoint = f"/_api/{self._type}/{self._id}"
if self._allow_retry and self._next_batch_id is not None:
endpoint += f"/{self._next_batch_id}" # pragma: no cover

Available only if the ``allowRetry`` query options is enabled.
Introduced in 3.11.

:return: New batch details.
:rtype: dict
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
if self._id is None:
raise CursorStateError("nextBatchId not set")
request = Request(
method="post",
endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}",
)
request = Request(method="post", endpoint=endpoint)
resp = self._conn.send_request(request)

if not resp.is_success:
Expand Down
60 changes: 53 additions & 7 deletions docs/cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ number of items in the result set may or may not be known in advance.
'FOR doc IN students FILTER doc.age > @val RETURN doc',
bind_vars={'val': 17},
batch_size=2,
count=True,
allow_retry=True
count=True
)

# Get the cursor ID.
Expand Down Expand Up @@ -73,11 +72,7 @@ number of items in the result set may or may not be known in advance.
cursor.pop()

# Fetch the next batch and add them to the cursor object.
try:
cursor.fetch()
except CursorNextError:
# Retry fetching the latest batch from the cursor.
cursor.retry()
cursor.fetch()

# Delete the cursor from the server.
cursor.close()
Expand Down Expand Up @@ -121,3 +116,54 @@ instead.
cursor.fetch()
while not cursor.empty(): # Pop until nothing is left on the cursor.
cursor.pop()

With ArangoDB 3.11.0 or higher, you can also use the `allow_retry`
parameter of :func:`arango.aql.AQL.execute` to automatically retry
the request if the cursor encountered any issues during the previous
fetch operation. Note that this feature causes the server to cache the
last batch. To allow re-fetching of the very last batch of the query,
the server cannot automatically delete the cursor. Once you have successfully
received the last batch, you should call :func:`arango.cursor.Cursor.close`.

**Example:**

.. code-block:: python

from arango import ArangoClient

# Initialize the ArangoDB client.
client = ArangoClient()

# Connect to "test" database as root user.
db = client.db('test', username='root', password='passwd')

# Set up some test data to query against.
db.collection('students').insert_many([
{'_key': 'Abby', 'age': 22},
{'_key': 'John', 'age': 18},
{'_key': 'Mary', 'age': 21},
{'_key': 'Suzy', 'age': 23},
{'_key': 'Dave', 'age': 20}
])

# Execute an AQL query which returns a cursor object.
cursor = db.aql.execute(
'FOR doc IN students FILTER doc.age > @val RETURN doc',
bind_vars={'val': 17},
batch_size=2,
count=True,
allow_retry=True
)

while cursor.has_more():
try:
cursor.fetch()
except ConnectionError:
# Retry the request.
continue

while not cursor.empty():
cursor.pop()

# Delete the cursor from the server.
cursor.close()
25 changes: 18 additions & 7 deletions tests/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def test_cursor_manual_fetch_and_pop(db, col, docs):
assert err.value.message == "current batch is empty"


def test_cursor_retry_disabled(db, col, db_version):
def test_cursor_retry_disabled(db, col, docs, db_version):
cursor = db.aql.execute(
f"FOR d IN {col.name} SORT d._key RETURN d",
count=True,
Expand All @@ -275,12 +275,17 @@ def test_cursor_retry_disabled(db, col, db_version):
)
result = cursor.fetch()
assert result["id"] == cursor.id
cursor._next_batch_id = "2"

if db_version >= version.parse("3.11.0"):
with pytest.raises(CursorNextError) as err:
cursor.retry()
assert err.value.message == "batch id not found"
while not cursor.empty():
cursor.pop()

# The next batch ID should have no effect
cursor._next_batch_id = "2"
result = cursor.fetch()
if db_version >= version.parse("3.11.1"):
assert result["next_batch_id"] == "4"
doc = cursor.pop()
assert clean_doc(doc) == docs[2]

assert cursor.close(ignore_missing=True)

Expand Down Expand Up @@ -312,7 +317,7 @@ def test_cursor_retry(db, col, docs, db_version):
# Decrease the next batch ID as if the previous fetch failed
if db_version >= version.parse("3.11.0"):
cursor._next_batch_id = "2"
result = cursor.retry()
result = cursor.fetch()
assert result["id"] == cursor.id
assert result["next_batch_id"] == "3"
doc = cursor.pop()
Expand All @@ -335,6 +340,12 @@ def test_cursor_retry(db, col, docs, db_version):
doc = cursor.pop()
assert clean_doc(doc) == docs[-1]

if db_version >= version.parse("3.11.0"):
# We should be able to fetch the last batch again
cursor.fetch()
doc = cursor.pop()
assert clean_doc(doc) == docs[-1]

if db_version >= version.parse("3.11.0"):
assert cursor.close()
else:
Expand Down