Skip to content

Commit b82de8c

Browse files
authored
[DE-544] Retriable batch reads (#254)
* Adding allow_retry query option * Adding allow_retry documentation * Fixing cursor tests * Fixing cursor docs
1 parent f330774 commit b82de8c

File tree

7 files changed

+141
-8
lines changed

7 files changed

+141
-8
lines changed

.github/workflows/build.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
- name: Create ArangoDB Docker container
3030
run: >
3131
docker create --name arango -p 8529:8529 -e ARANGO_ROOT_PASSWORD=passwd -v "$(pwd)/tests/static/":/tests/static
32-
arangodb/arangodb:3.10.6 --server.jwt-secret-keyfile=/tests/static/keyfile
32+
arangodb/arangodb:3.10.9 --server.jwt-secret-keyfile=/tests/static/keyfile
3333
3434
- name: Start ArangoDB Docker container
3535
run: docker start arango

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
main
22
----
33

4+
* Added allow_retry query parameter, making it possible to retry fetching the latest batch from a cursor.
5+
46
* Added OverloadControlDatabase, enabling the client to react effectively to potential server overloads.
57

68
* The db.version() now has a new optional parameter "details" that can be used to return additional information about

arango/aql.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ def execute(
275275
max_runtime: Optional[Number] = None,
276276
fill_block_cache: Optional[bool] = None,
277277
allow_dirty_read: bool = False,
278+
allow_retry: bool = False,
278279
) -> Result[Cursor]:
279280
"""Execute the query and return the result cursor.
280281
@@ -369,6 +370,9 @@ def execute(
369370
:type fill_block_cache: bool
370371
:param allow_dirty_read: Allow reads from followers in a cluster.
371372
:type allow_dirty_read: bool | None
373+
:param allow_retry: Make it possible to retry fetching the latest batch
374+
from a cursor.
375+
:type allow_retry: bool
372376
:return: Result cursor.
373377
:rtype: arango.cursor.Cursor
374378
:raise arango.exceptions.AQLQueryExecuteError: If execute fails.
@@ -415,6 +419,10 @@ def execute(
415419
if max_runtime is not None:
416420
options["maxRuntime"] = max_runtime
417421

422+
# New in 3.11
423+
if allow_retry is not None:
424+
options["allowRetry"] = allow_retry
425+
418426
if options:
419427
data["options"] = options
420428
data.update(options)

arango/cursor.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class Cursor:
4040
"_warnings",
4141
"_has_more",
4242
"_batch",
43+
"_next_batch_id",
4344
]
4445

4546
def __init__(
@@ -57,6 +58,7 @@ def __init__(
5758
self._stats = None
5859
self._profile = None
5960
self._warnings = None
61+
self._next_batch_id: Optional[str] = None
6062
self._update(init_data)
6163

6264
def __iter__(self) -> "Cursor":
@@ -99,6 +101,11 @@ def _update(self, data: Json) -> Json:
99101
self._cached = data["cached"]
100102
result["cached"] = data["cached"]
101103

104+
# New in 3.11
105+
if "nextBatchId" in data:
106+
self._next_batch_id = data["nextBatchId"]
107+
result["next_batch_id"] = data["nextBatchId"]
108+
102109
self._has_more = bool(data["hasMore"])
103110
result["has_more"] = data["hasMore"]
104111

@@ -138,6 +145,11 @@ def _update(self, data: Json) -> Json:
138145
stats["cacheHits"] = stats.pop("cacheHits")
139146
if "cacheMisses" in stats:
140147
stats["cacheMisses"] = stats.pop("cacheMisses")
148+
149+
# New in 3.11
150+
if "peakMemoryUsage" in stats:
151+
stats["peak_memory_usage"] = stats.pop("peakMemoryUsage")
152+
141153
self._stats = stats
142154
result["statistics"] = stats
143155

@@ -268,7 +280,33 @@ def fetch(self) -> Json:
268280
"""
269281
if self._id is None:
270282
raise CursorStateError("cursor ID not set")
271-
request = Request(method="put", endpoint=f"/_api/{self._type}/{self._id}")
283+
request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}")
284+
resp = self._conn.send_request(request)
285+
286+
if not resp.is_success:
287+
raise CursorNextError(resp, request)
288+
289+
return self._update(resp.body)
290+
291+
def retry(self) -> Json:
292+
"""Retry fetching the next batch from server and update the cursor.
293+
294+
Available only if the ``allowRetry`` query options is enabled.
295+
Introduced in 3.11.
296+
297+
:return: New batch details.
298+
:rtype: dict
299+
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
300+
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
301+
"""
302+
if self._id is None:
303+
raise CursorStateError("cursor ID not set")
304+
if self._id is None:
305+
raise CursorStateError("nextBatchId not set")
306+
request = Request(
307+
method="post",
308+
endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}",
309+
)
272310
resp = self._conn.send_request(request)
273311

274312
if not resp.is_success:

docs/cursor.rst

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ number of items in the result set may or may not be known in advance.
3333
'FOR doc IN students FILTER doc.age > @val RETURN doc',
3434
bind_vars={'val': 17},
3535
batch_size=2,
36-
count=True
36+
count=True,
37+
allow_retry=True
3738
)
3839

3940
# Get the cursor ID.
@@ -72,7 +73,11 @@ number of items in the result set may or may not be known in advance.
7273
cursor.pop()
7374

7475
# Fetch the next batch and add them to the cursor object.
75-
cursor.fetch()
76+
try:
77+
cursor.fetch()
78+
except CursorNextError:
79+
# Retry fetching the latest batch from the cursor.
80+
cursor.retry()
7681

7782
# Delete the cursor from the server.
7883
cursor.close()

tester.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# 4. Runs all python-arango tests, including enterprise tests.
88

99
# Usage:
10-
# ./start.sh [all|single|cluster] [all|community|enterprise] [version] ["notest"]
10+
# ./tester.sh [all|single|cluster] [all|community|enterprise] [version] ["notest"]
1111

1212
setup="${1:-all}"
1313
if [[ "$setup" != "all" && "$setup" != "single" && "$setup" != "cluster" ]]; then
@@ -21,10 +21,10 @@ if [[ "$tests" != "all" && "$tests" != "community" && "$tests" != "enterprise" ]
2121
exit 1
2222
fi
2323

24-
# 3.11.0
25-
# 3.10.6
24+
# 3.11.1
25+
# 3.10.9
2626
# 3.9.9
27-
version="${3:-3.11.0}"
27+
version="${3:-3.11.1}"
2828

2929
if [[ -n "$4" && "$4" != "notest" ]]; then
3030
echo "Invalid argument. Use 'notest' to only start the docker container, without running the tests."

tests/test_cursor.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pytest
2+
from packaging import version
23

34
from arango.exceptions import (
45
CursorCloseError,
@@ -262,6 +263,85 @@ def test_cursor_manual_fetch_and_pop(db, col, docs):
262263
assert err.value.message == "current batch is empty"
263264

264265

266+
def test_cursor_retry_disabled(db, col, db_version):
267+
cursor = db.aql.execute(
268+
f"FOR d IN {col.name} SORT d._key RETURN d",
269+
count=True,
270+
batch_size=1,
271+
ttl=1000,
272+
optimizer_rules=["+all"],
273+
profile=True,
274+
allow_retry=False,
275+
)
276+
result = cursor.fetch()
277+
assert result["id"] == cursor.id
278+
cursor._next_batch_id = "2"
279+
280+
if db_version >= version.parse("3.11.0"):
281+
with pytest.raises(CursorNextError) as err:
282+
cursor.retry()
283+
assert err.value.message == "batch id not found"
284+
285+
assert cursor.close(ignore_missing=True)
286+
287+
288+
def test_cursor_retry(db, col, docs, db_version):
289+
cursor = db.aql.execute(
290+
f"FOR d IN {col.name} SORT d._key RETURN d",
291+
count=True,
292+
batch_size=1,
293+
ttl=1000,
294+
optimizer_rules=["+all"],
295+
profile=True,
296+
allow_retry=True,
297+
)
298+
299+
assert cursor.count() == len(docs)
300+
doc = cursor.pop()
301+
assert clean_doc(doc) == docs[0]
302+
assert cursor.has_more()
303+
304+
result = cursor.fetch()
305+
assert result["id"] == cursor.id
306+
if db_version >= version.parse("3.11.0"):
307+
assert result["next_batch_id"] == "3"
308+
doc = cursor.pop()
309+
assert clean_doc(doc) == docs[1]
310+
assert cursor.empty()
311+
312+
# Decrease the next batch ID as if the previous fetch failed
313+
if db_version >= version.parse("3.11.0"):
314+
cursor._next_batch_id = "2"
315+
result = cursor.retry()
316+
assert result["id"] == cursor.id
317+
assert result["next_batch_id"] == "3"
318+
doc = cursor.pop()
319+
assert clean_doc(doc) == docs[1]
320+
assert cursor.empty()
321+
322+
# Fetch the next batches normally
323+
for batch in range(2, 5):
324+
result = cursor.fetch()
325+
assert result["id"] == cursor.id
326+
if db_version >= version.parse("3.11.0"):
327+
assert result["next_batch_id"] == str(batch + 2)
328+
doc = cursor.pop()
329+
assert clean_doc(doc) == docs[batch]
330+
331+
result = cursor.fetch()
332+
assert not cursor.has_more()
333+
assert "id" not in result
334+
assert "next_batch_id" not in result
335+
doc = cursor.pop()
336+
assert clean_doc(doc) == docs[-1]
337+
338+
if db_version >= version.parse("3.11.0"):
339+
assert cursor.close()
340+
else:
341+
with pytest.raises(CursorCloseError):
342+
cursor.close()
343+
344+
265345
def test_cursor_no_count(db, col):
266346
cursor = db.aql.execute(
267347
f"FOR d IN {col.name} SORT d._key RETURN d",

0 commit comments

Comments
 (0)