Skip to content

Commit a62a246

Browse files
committed
Refactoring allow_retry implementation
1 parent b82de8c commit a62a246

File tree

4 files changed

+84
-41
lines changed

4 files changed

+84
-41
lines changed

arango/aql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def execute(
313313
development to catch issues early. If set to False, warnings are
314314
returned with the query result. There is a server configuration
315315
option "--query.fail-on-warning" for setting the default value for
316-
this behaviour so it does not need to be set per-query.
316+
this behaviour, so it does not need to be set per-query.
317317
:type fail_on_warning: bool
318318
:param profile: Return additional profiling details in the cursor,
319319
unless the query cache is used.
@@ -437,7 +437,7 @@ def execute(
437437
def response_handler(resp: Response) -> Cursor:
438438
if not resp.is_success:
439439
raise AQLQueryExecuteError(resp, request)
440-
return Cursor(self._conn, resp.body)
440+
return Cursor(self._conn, resp.body, allow_retry=allow_retry)
441441

442442
return self._execute(request, response_handler)
443443

arango/cursor.py

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ class Cursor:
2727
:type init_data: dict
2828
:param cursor_type: Cursor type ("cursor" or "export").
2929
:type cursor_type: str
30+
:param allow_retry: If set to True, the cursor will always attempt to fetch
31+
the latest batch from server even if the previous attempt failed.
32+
This option is only available for server versions 3.11 and above.
33+
:type allow_retry: bool
3034
"""
3135

3236
__slots__ = [
@@ -41,16 +45,19 @@ class Cursor:
4145
"_has_more",
4246
"_batch",
4347
"_next_batch_id",
48+
"_allow_retry",
4449
]
4550

4651
def __init__(
4752
self,
4853
connection: BaseConnection,
4954
init_data: Json,
5055
cursor_type: str = "cursor",
56+
allow_retry: bool = False,
5157
) -> None:
5258
self._conn = connection
5359
self._type = cursor_type
60+
self._allow_retry = allow_retry
5461
self._batch: Deque[Any] = deque()
5562
self._id = None
5663
self._count: Optional[int] = None
@@ -280,33 +287,12 @@ def fetch(self) -> Json:
280287
"""
281288
if self._id is None:
282289
raise CursorStateError("cursor ID not set")
283-
request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}")
284-
resp = self._conn.send_request(request)
285-
286-
if not resp.is_success:
287-
raise CursorNextError(resp, request)
288290

289-
return self._update(resp.body)
290-
291-
def retry(self) -> Json:
292-
"""Retry fetching the next batch from server and update the cursor.
291+
endpoint = f"/_api/{self._type}/{self._id}"
292+
if self._allow_retry and self._next_batch_id is not None:
293+
endpoint += f"/{self._next_batch_id}"
293294

294-
Available only if the ``allowRetry`` query options is enabled.
295-
Introduced in 3.11.
296-
297-
:return: New batch details.
298-
:rtype: dict
299-
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
300-
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
301-
"""
302-
if self._id is None:
303-
raise CursorStateError("cursor ID not set")
304-
if self._id is None:
305-
raise CursorStateError("nextBatchId not set")
306-
request = Request(
307-
method="post",
308-
endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}",
309-
)
295+
request = Request(method="post", endpoint=endpoint)
310296
resp = self._conn.send_request(request)
311297

312298
if not resp.is_success:

docs/cursor.rst

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ number of items in the result set may or may not be known in advance.
3333
'FOR doc IN students FILTER doc.age > @val RETURN doc',
3434
bind_vars={'val': 17},
3535
batch_size=2,
36-
count=True,
37-
allow_retry=True
36+
count=True
3837
)
3938

4039
# Get the cursor ID.
@@ -73,11 +72,7 @@ number of items in the result set may or may not be known in advance.
7372
cursor.pop()
7473

7574
# Fetch the next batch and add them to the cursor object.
76-
try:
77-
cursor.fetch()
78-
except CursorNextError:
79-
# Retry fetching the latest batch from the cursor.
80-
cursor.retry()
75+
cursor.fetch()
8176

8277
# Delete the cursor from the server.
8378
cursor.close()
@@ -121,3 +116,54 @@ instead.
121116
cursor.fetch()
122117
while not cursor.empty(): # Pop until nothing is left on the cursor.
123118
cursor.pop()
119+
120+
With ArangoDB 3.11.0 or higher, you can also use the `allow_retry`
121+
parameter of :func:`arango.aql.AQL.execute` to automatically retry
122+
the request if the cursor encountered any issues during the previous
123+
fetch operation. Note that this feature causes the server to cache the
124+
last batch. To allow re-fetching of the very last batch of the query,
125+
the server cannot automatically delete the cursor. Once you have successfully
126+
received the last batch, you should call :func:`arango.cursor.Cursor.close`.
127+
128+
**Example:**
129+
130+
.. code-block:: python
131+
132+
from arango import ArangoClient
133+
134+
# Initialize the ArangoDB client.
135+
client = ArangoClient()
136+
137+
# Connect to "test" database as root user.
138+
db = client.db('test', username='root', password='passwd')
139+
140+
# Set up some test data to query against.
141+
db.collection('students').insert_many([
142+
{'_key': 'Abby', 'age': 22},
143+
{'_key': 'John', 'age': 18},
144+
{'_key': 'Mary', 'age': 21},
145+
{'_key': 'Suzy', 'age': 23},
146+
{'_key': 'Dave', 'age': 20}
147+
])
148+
149+
# Execute an AQL query which returns a cursor object.
150+
cursor = db.aql.execute(
151+
'FOR doc IN students FILTER doc.age > @val RETURN doc',
152+
bind_vars={'val': 17},
153+
batch_size=2,
154+
count=True,
155+
allow_retry=True
156+
)
157+
158+
while cursor.has_more():
159+
try:
160+
cursor.fetch()
161+
except ConnectionError:
162+
# Retry the request.
163+
continue
164+
165+
while not cursor.empty():
166+
cursor.pop()
167+
168+
# Delete the cursor from the server.
169+
cursor.close()

tests/test_cursor.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ def test_cursor_manual_fetch_and_pop(db, col, docs):
263263
assert err.value.message == "current batch is empty"
264264

265265

266-
def test_cursor_retry_disabled(db, col, db_version):
266+
def test_cursor_retry_disabled(db, col, docs, db_version):
267267
cursor = db.aql.execute(
268268
f"FOR d IN {col.name} SORT d._key RETURN d",
269269
count=True,
@@ -275,12 +275,17 @@ def test_cursor_retry_disabled(db, col, db_version):
275275
)
276276
result = cursor.fetch()
277277
assert result["id"] == cursor.id
278-
cursor._next_batch_id = "2"
279278

280-
if db_version >= version.parse("3.11.0"):
281-
with pytest.raises(CursorNextError) as err:
282-
cursor.retry()
283-
assert err.value.message == "batch id not found"
279+
while not cursor.empty():
280+
cursor.pop()
281+
282+
# The next batch ID should have no effect
283+
cursor._next_batch_id = "2"
284+
result = cursor.fetch()
285+
if db_version >= version.parse("3.11.1"):
286+
assert result["next_batch_id"] == "4"
287+
doc = cursor.pop()
288+
assert clean_doc(doc) == docs[2]
284289

285290
assert cursor.close(ignore_missing=True)
286291

@@ -312,7 +317,7 @@ def test_cursor_retry(db, col, docs, db_version):
312317
# Decrease the next batch ID as if the previous fetch failed
313318
if db_version >= version.parse("3.11.0"):
314319
cursor._next_batch_id = "2"
315-
result = cursor.retry()
320+
result = cursor.fetch()
316321
assert result["id"] == cursor.id
317322
assert result["next_batch_id"] == "3"
318323
doc = cursor.pop()
@@ -335,6 +340,12 @@ def test_cursor_retry(db, col, docs, db_version):
335340
doc = cursor.pop()
336341
assert clean_doc(doc) == docs[-1]
337342

343+
if db_version >= version.parse("3.11.0"):
344+
# We should be able to fetch the last batch again
345+
cursor.fetch()
346+
doc = cursor.pop()
347+
assert clean_doc(doc) == docs[-1]
348+
338349
if db_version >= version.parse("3.11.0"):
339350
assert cursor.close()
340351
else:

0 commit comments

Comments
 (0)