diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 44b14258..cdd55d25 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -29,7 +29,7 @@ jobs: - name: Create ArangoDB Docker container run: > docker create --name arango -p 8529:8529 -e ARANGO_ROOT_PASSWORD=passwd -v "$(pwd)/tests/static/":/tests/static - arangodb/arangodb:3.10.6 --server.jwt-secret-keyfile=/tests/static/keyfile + arangodb/arangodb:3.10.9 --server.jwt-secret-keyfile=/tests/static/keyfile - name: Start ArangoDB Docker container run: docker start arango diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e2fe760..f73dd0e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ main ---- +* Added allow_retry query parameter, making it possible to retry fetching the latest batch from a cursor. + * Added OverloadControlDatabase, enabling the client to react effectively to potential server overloads. * The db.version() now has a new optional parameter "details" that can be used to return additional information about diff --git a/arango/aql.py b/arango/aql.py index d5ba40f6..21ff248c 100644 --- a/arango/aql.py +++ b/arango/aql.py @@ -275,6 +275,7 @@ def execute( max_runtime: Optional[Number] = None, fill_block_cache: Optional[bool] = None, allow_dirty_read: bool = False, + allow_retry: bool = False, ) -> Result[Cursor]: """Execute the query and return the result cursor. @@ -369,6 +370,9 @@ def execute( :type fill_block_cache: bool :param allow_dirty_read: Allow reads from followers in a cluster. :type allow_dirty_read: bool | None + :param allow_retry: Make it possible to retry fetching the latest batch + from a cursor. + :type allow_retry: bool :return: Result cursor. :rtype: arango.cursor.Cursor :raise arango.exceptions.AQLQueryExecuteError: If execute fails. @@ -415,6 +419,10 @@ def execute( if max_runtime is not None: options["maxRuntime"] = max_runtime + # New in 3.11 + if allow_retry is not None: + options["allowRetry"] = allow_retry + if options: data["options"] = options data.update(options) diff --git a/arango/cursor.py b/arango/cursor.py index a42cb784..5a51a715 100644 --- a/arango/cursor.py +++ b/arango/cursor.py @@ -40,6 +40,7 @@ class Cursor: "_warnings", "_has_more", "_batch", + "_next_batch_id", ] def __init__( @@ -57,6 +58,7 @@ def __init__( self._stats = None self._profile = None self._warnings = None + self._next_batch_id: Optional[str] = None self._update(init_data) def __iter__(self) -> "Cursor": @@ -99,6 +101,11 @@ def _update(self, data: Json) -> Json: self._cached = data["cached"] result["cached"] = data["cached"] + # New in 3.11 + if "nextBatchId" in data: + self._next_batch_id = data["nextBatchId"] + result["next_batch_id"] = data["nextBatchId"] + self._has_more = bool(data["hasMore"]) result["has_more"] = data["hasMore"] @@ -138,6 +145,11 @@ def _update(self, data: Json) -> Json: stats["cacheHits"] = stats.pop("cacheHits") if "cacheMisses" in stats: stats["cacheMisses"] = stats.pop("cacheMisses") + + # New in 3.11 + if "peakMemoryUsage" in stats: + stats["peak_memory_usage"] = stats.pop("peakMemoryUsage") + self._stats = stats result["statistics"] = stats @@ -268,7 +280,33 @@ def fetch(self) -> Json: """ if self._id is None: raise CursorStateError("cursor ID not set") - request = Request(method="put", endpoint=f"/_api/{self._type}/{self._id}") + request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}") + resp = self._conn.send_request(request) + + if not resp.is_success: + raise CursorNextError(resp, request) + + return self._update(resp.body) + + def retry(self) -> Json: + """Retry fetching the next batch from server and update the cursor. + + Available only if the ``allowRetry`` query options is enabled. + Introduced in 3.11. + + :return: New batch details. + :rtype: dict + :raise arango.exceptions.CursorNextError: If batch retrieval fails. + :raise arango.exceptions.CursorStateError: If cursor ID is not set. + """ + if self._id is None: + raise CursorStateError("cursor ID not set") + if self._id is None: + raise CursorStateError("nextBatchId not set") + request = Request( + method="post", + endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}", + ) resp = self._conn.send_request(request) if not resp.is_success: diff --git a/docs/cursor.rst b/docs/cursor.rst index 10d12292..6a713733 100644 --- a/docs/cursor.rst +++ b/docs/cursor.rst @@ -33,7 +33,8 @@ number of items in the result set may or may not be known in advance. 'FOR doc IN students FILTER doc.age > @val RETURN doc', bind_vars={'val': 17}, batch_size=2, - count=True + count=True, + allow_retry=True ) # Get the cursor ID. @@ -72,7 +73,11 @@ number of items in the result set may or may not be known in advance. cursor.pop() # Fetch the next batch and add them to the cursor object. - cursor.fetch() + try: + cursor.fetch() + except CursorNextError: + # Retry fetching the latest batch from the cursor. + cursor.retry() # Delete the cursor from the server. cursor.close() diff --git a/tester.sh b/tester.sh index 977a56aa..7842427d 100755 --- a/tester.sh +++ b/tester.sh @@ -7,7 +7,7 @@ # 4. Runs all python-arango tests, including enterprise tests. # Usage: -# ./start.sh [all|single|cluster] [all|community|enterprise] [version] ["notest"] +# ./tester.sh [all|single|cluster] [all|community|enterprise] [version] ["notest"] setup="${1:-all}" if [[ "$setup" != "all" && "$setup" != "single" && "$setup" != "cluster" ]]; then @@ -21,10 +21,10 @@ if [[ "$tests" != "all" && "$tests" != "community" && "$tests" != "enterprise" ] exit 1 fi -# 3.11.0 -# 3.10.6 +# 3.11.1 +# 3.10.9 # 3.9.9 -version="${3:-3.11.0}" +version="${3:-3.11.1}" if [[ -n "$4" && "$4" != "notest" ]]; then echo "Invalid argument. Use 'notest' to only start the docker container, without running the tests." diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 3dba809d..2c370fd1 100644 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -1,4 +1,5 @@ import pytest +from packaging import version from arango.exceptions import ( CursorCloseError, @@ -262,6 +263,85 @@ def test_cursor_manual_fetch_and_pop(db, col, docs): assert err.value.message == "current batch is empty" +def test_cursor_retry_disabled(db, col, db_version): + cursor = db.aql.execute( + f"FOR d IN {col.name} SORT d._key RETURN d", + count=True, + batch_size=1, + ttl=1000, + optimizer_rules=["+all"], + profile=True, + allow_retry=False, + ) + result = cursor.fetch() + assert result["id"] == cursor.id + cursor._next_batch_id = "2" + + if db_version >= version.parse("3.11.0"): + with pytest.raises(CursorNextError) as err: + cursor.retry() + assert err.value.message == "batch id not found" + + assert cursor.close(ignore_missing=True) + + +def test_cursor_retry(db, col, docs, db_version): + cursor = db.aql.execute( + f"FOR d IN {col.name} SORT d._key RETURN d", + count=True, + batch_size=1, + ttl=1000, + optimizer_rules=["+all"], + profile=True, + allow_retry=True, + ) + + assert cursor.count() == len(docs) + doc = cursor.pop() + assert clean_doc(doc) == docs[0] + assert cursor.has_more() + + result = cursor.fetch() + assert result["id"] == cursor.id + if db_version >= version.parse("3.11.0"): + assert result["next_batch_id"] == "3" + doc = cursor.pop() + assert clean_doc(doc) == docs[1] + assert cursor.empty() + + # Decrease the next batch ID as if the previous fetch failed + if db_version >= version.parse("3.11.0"): + cursor._next_batch_id = "2" + result = cursor.retry() + assert result["id"] == cursor.id + assert result["next_batch_id"] == "3" + doc = cursor.pop() + assert clean_doc(doc) == docs[1] + assert cursor.empty() + + # Fetch the next batches normally + for batch in range(2, 5): + result = cursor.fetch() + assert result["id"] == cursor.id + if db_version >= version.parse("3.11.0"): + assert result["next_batch_id"] == str(batch + 2) + doc = cursor.pop() + assert clean_doc(doc) == docs[batch] + + result = cursor.fetch() + assert not cursor.has_more() + assert "id" not in result + assert "next_batch_id" not in result + doc = cursor.pop() + assert clean_doc(doc) == docs[-1] + + if db_version >= version.parse("3.11.0"): + assert cursor.close() + else: + with pytest.raises(CursorCloseError): + cursor.close() + + def test_cursor_no_count(db, col): cursor = db.aql.execute( f"FOR d IN {col.name} SORT d._key RETURN d",