Skip to content

[DE-544] Retriable batch reads #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Create ArangoDB Docker container
run: >
docker create --name arango -p 8529:8529 -e ARANGO_ROOT_PASSWORD=passwd -v "$(pwd)/tests/static/":/tests/static
arangodb/arangodb:3.10.6 --server.jwt-secret-keyfile=/tests/static/keyfile
arangodb/arangodb:3.10.9 --server.jwt-secret-keyfile=/tests/static/keyfile

- name: Start ArangoDB Docker container
run: docker start arango
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
main
----

* Added allow_retry query parameter, making it possible to retry fetching the latest batch from a cursor.

* Added OverloadControlDatabase, enabling the client to react effectively to potential server overloads.

* The db.version() now has a new optional parameter "details" that can be used to return additional information about
Expand Down
8 changes: 8 additions & 0 deletions arango/aql.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def execute(
max_runtime: Optional[Number] = None,
fill_block_cache: Optional[bool] = None,
allow_dirty_read: bool = False,
allow_retry: bool = False,
) -> Result[Cursor]:
"""Execute the query and return the result cursor.

Expand Down Expand Up @@ -369,6 +370,9 @@ def execute(
:type fill_block_cache: bool
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:param allow_retry: Make it possible to retry fetching the latest batch
from a cursor.
:type allow_retry: bool
:return: Result cursor.
:rtype: arango.cursor.Cursor
:raise arango.exceptions.AQLQueryExecuteError: If execute fails.
Expand Down Expand Up @@ -415,6 +419,10 @@ def execute(
if max_runtime is not None:
options["maxRuntime"] = max_runtime

# New in 3.11
if allow_retry is not None:
options["allowRetry"] = allow_retry

if options:
data["options"] = options
data.update(options)
Expand Down
40 changes: 39 additions & 1 deletion arango/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Cursor:
"_warnings",
"_has_more",
"_batch",
"_next_batch_id",
]

def __init__(
Expand All @@ -57,6 +58,7 @@ def __init__(
self._stats = None
self._profile = None
self._warnings = None
self._next_batch_id: Optional[str] = None
self._update(init_data)

def __iter__(self) -> "Cursor":
Expand Down Expand Up @@ -99,6 +101,11 @@ def _update(self, data: Json) -> Json:
self._cached = data["cached"]
result["cached"] = data["cached"]

# New in 3.11
if "nextBatchId" in data:
self._next_batch_id = data["nextBatchId"]
result["next_batch_id"] = data["nextBatchId"]

self._has_more = bool(data["hasMore"])
result["has_more"] = data["hasMore"]

Expand Down Expand Up @@ -138,6 +145,11 @@ def _update(self, data: Json) -> Json:
stats["cacheHits"] = stats.pop("cacheHits")
if "cacheMisses" in stats:
stats["cacheMisses"] = stats.pop("cacheMisses")

# New in 3.11
if "peakMemoryUsage" in stats:
stats["peak_memory_usage"] = stats.pop("peakMemoryUsage")

self._stats = stats
result["statistics"] = stats

Expand Down Expand Up @@ -268,7 +280,33 @@ def fetch(self) -> Json:
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
request = Request(method="put", endpoint=f"/_api/{self._type}/{self._id}")
request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}")
resp = self._conn.send_request(request)

if not resp.is_success:
raise CursorNextError(resp, request)

return self._update(resp.body)

def retry(self) -> Json:
"""Retry fetching the next batch from server and update the cursor.

Available only if the ``allowRetry`` query options is enabled.
Introduced in 3.11.

:return: New batch details.
:rtype: dict
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
if self._id is None:
raise CursorStateError("nextBatchId not set")
request = Request(
method="post",
endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}",
)
resp = self._conn.send_request(request)

if not resp.is_success:
Expand Down
9 changes: 7 additions & 2 deletions docs/cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ number of items in the result set may or may not be known in advance.
'FOR doc IN students FILTER doc.age > @val RETURN doc',
bind_vars={'val': 17},
batch_size=2,
count=True
count=True,
allow_retry=True
)

# Get the cursor ID.
Expand Down Expand Up @@ -72,7 +73,11 @@ number of items in the result set may or may not be known in advance.
cursor.pop()

# Fetch the next batch and add them to the cursor object.
cursor.fetch()
try:
cursor.fetch()
except CursorNextError:
# Retry fetching the latest batch from the cursor.
cursor.retry()

# Delete the cursor from the server.
cursor.close()
Expand Down
8 changes: 4 additions & 4 deletions tester.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# 4. Runs all python-arango tests, including enterprise tests.

# Usage:
# ./start.sh [all|single|cluster] [all|community|enterprise] [version] ["notest"]
# ./tester.sh [all|single|cluster] [all|community|enterprise] [version] ["notest"]

setup="${1:-all}"
if [[ "$setup" != "all" && "$setup" != "single" && "$setup" != "cluster" ]]; then
Expand All @@ -21,10 +21,10 @@ if [[ "$tests" != "all" && "$tests" != "community" && "$tests" != "enterprise" ]
exit 1
fi

# 3.11.0
# 3.10.6
# 3.11.1
# 3.10.9
# 3.9.9
version="${3:-3.11.0}"
version="${3:-3.11.1}"

if [[ -n "$4" && "$4" != "notest" ]]; then
echo "Invalid argument. Use 'notest' to only start the docker container, without running the tests."
Expand Down
80 changes: 80 additions & 0 deletions tests/test_cursor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from packaging import version

from arango.exceptions import (
CursorCloseError,
Expand Down Expand Up @@ -262,6 +263,85 @@ def test_cursor_manual_fetch_and_pop(db, col, docs):
assert err.value.message == "current batch is empty"


def test_cursor_retry_disabled(db, col, db_version):
cursor = db.aql.execute(
f"FOR d IN {col.name} SORT d._key RETURN d",
count=True,
batch_size=1,
ttl=1000,
optimizer_rules=["+all"],
profile=True,
allow_retry=False,
)
result = cursor.fetch()
assert result["id"] == cursor.id
cursor._next_batch_id = "2"

if db_version >= version.parse("3.11.0"):
with pytest.raises(CursorNextError) as err:
cursor.retry()
assert err.value.message == "batch id not found"

assert cursor.close(ignore_missing=True)


def test_cursor_retry(db, col, docs, db_version):
cursor = db.aql.execute(
f"FOR d IN {col.name} SORT d._key RETURN d",
count=True,
batch_size=1,
ttl=1000,
optimizer_rules=["+all"],
profile=True,
allow_retry=True,
)

assert cursor.count() == len(docs)
doc = cursor.pop()
assert clean_doc(doc) == docs[0]
assert cursor.has_more()

result = cursor.fetch()
assert result["id"] == cursor.id
if db_version >= version.parse("3.11.0"):
assert result["next_batch_id"] == "3"
doc = cursor.pop()
assert clean_doc(doc) == docs[1]
assert cursor.empty()

# Decrease the next batch ID as if the previous fetch failed
if db_version >= version.parse("3.11.0"):
cursor._next_batch_id = "2"
result = cursor.retry()
assert result["id"] == cursor.id
assert result["next_batch_id"] == "3"
doc = cursor.pop()
assert clean_doc(doc) == docs[1]
assert cursor.empty()

# Fetch the next batches normally
for batch in range(2, 5):
result = cursor.fetch()
assert result["id"] == cursor.id
if db_version >= version.parse("3.11.0"):
assert result["next_batch_id"] == str(batch + 2)
doc = cursor.pop()
assert clean_doc(doc) == docs[batch]

result = cursor.fetch()
assert not cursor.has_more()
assert "id" not in result
assert "next_batch_id" not in result
doc = cursor.pop()
assert clean_doc(doc) == docs[-1]

if db_version >= version.parse("3.11.0"):
assert cursor.close()
else:
with pytest.raises(CursorCloseError):
cursor.close()


def test_cursor_no_count(db, col):
cursor = db.aql.execute(
f"FOR d IN {col.name} SORT d._key RETURN d",
Expand Down