From e1bc6307b92c28b637a35e132299d4b82c50d46e Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Wed, 26 Feb 2025 09:46:15 +0530 Subject: [PATCH 1/3] Adding support for async jobs --- arangoasync/aql.py | 2 +- arangoasync/collection.py | 2 +- arangoasync/database.py | 116 ++++++++++++++++++++- arangoasync/exceptions.py | 24 +++++ arangoasync/executor.py | 52 +++++++++- arangoasync/job.py | 206 +++++++++++++++++++++++++++++++++++++- arangoasync/result.py | 9 ++ arangoasync/typings.py | 9 +- docs/specs.rst | 6 ++ tests/test_async.py | 103 +++++++++++++++++++ 10 files changed, 511 insertions(+), 18 deletions(-) create mode 100644 arangoasync/result.py create mode 100644 tests/test_async.py diff --git a/arangoasync/aql.py b/arangoasync/aql.py index 804744e..3ab03ce 100644 --- a/arangoasync/aql.py +++ b/arangoasync/aql.py @@ -26,6 +26,7 @@ from arangoasync.executor import ApiExecutor from arangoasync.request import Method, Request from arangoasync.response import Response +from arangoasync.result import Result from arangoasync.serialization import Deserializer, Serializer from arangoasync.typings import ( Json, @@ -34,7 +35,6 @@ QueryExplainOptions, QueryProperties, QueryTrackingConfiguration, - Result, ) diff --git a/arangoasync/collection.py b/arangoasync/collection.py index 69c271e..2d4f592 100644 --- a/arangoasync/collection.py +++ b/arangoasync/collection.py @@ -23,6 +23,7 @@ from arangoasync.executor import ApiExecutor from arangoasync.request import Method, Request from arangoasync.response import Response +from arangoasync.result import Result from arangoasync.serialization import Deserializer, Serializer from arangoasync.typings import ( CollectionProperties, @@ -30,7 +31,6 @@ Json, Jsons, Params, - Result, ) T = TypeVar("T") diff --git a/arangoasync/database.py b/arangoasync/database.py index 12913db..a17b1f2 100644 --- a/arangoasync/database.py +++ b/arangoasync/database.py @@ -13,6 +13,8 @@ from arangoasync.connection import Connection from arangoasync.errno import HTTP_FORBIDDEN, HTTP_NOT_FOUND from arangoasync.exceptions import ( + AsyncJobClearError, + AsyncJobListError, CollectionCreateError, CollectionDeleteError, CollectionListError, @@ -41,9 +43,15 @@ UserReplaceError, UserUpdateError, ) -from arangoasync.executor import ApiExecutor, DefaultApiExecutor, TransactionApiExecutor +from arangoasync.executor import ( + ApiExecutor, + AsyncApiExecutor, + DefaultApiExecutor, + TransactionApiExecutor, +) from arangoasync.request import Method, Request from arangoasync.response import Response +from arangoasync.result import Result from arangoasync.serialization import Deserializer, Serializer from arangoasync.typings import ( CollectionInfo, @@ -53,7 +61,6 @@ Jsons, KeyOptions, Params, - Result, ServerStatusInformation, UserInfo, ) @@ -1314,7 +1321,7 @@ def response_handler(resp: Response) -> str: return cast(str, result["id"]) transaction_id = await self._executor.execute(request, response_handler) - return TransactionDatabase(self.connection, transaction_id) + return TransactionDatabase(self.connection, cast(str, transaction_id)) def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase": """Fetch an existing transaction. @@ -1328,6 +1335,86 @@ def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase": """ return TransactionDatabase(self.connection, transaction_id) + def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase": + """Begin async execution. + + Args: + return_result (bool): If set to `True`, API executions return instances of + `arangoasync.job.AsyncJob`, which you can be used to retrieve + results from server once available. Otherwise, API executions + return `None` and no results are stored on server. + + Returns: + AsyncDatabase: Database API wrapper tailored for async execution. + """ + return AsyncDatabase(self.connection, return_result) + + async def async_jobs( + self, status: str, count: Optional[int] = None + ) -> Result[List[str]]: + """Return IDs of async jobs with given status. + + Args: + status (str): Job status (e.g. "pending", "done"). + count (int | None): Max number of job IDs to return. + + Returns: + list: List of job IDs. + + Raises: + AsyncJobListError: If retrieval fails. + + References: + - `list-async-jobs-by-status-or-get-the-status-of-specific-job `__ + """ # noqa: E501 + params: Params = {} + if count is not None: + params["count"] = count + + request = Request( + method=Method.GET, endpoint=f"/_api/job/{status}", params=params + ) + + def response_handler(resp: Response) -> List[str]: + if resp.is_success: + return cast(List[str], self.deserializer.loads(resp.raw_body)) + raise AsyncJobListError(resp, request) + + return await self._executor.execute(request, response_handler) + + async def clear_async_jobs(self, threshold: Optional[float] = None) -> None: + """Clear async job results from the server. + + Async jobs that are still queued or running are not stopped. + Clients can use this method to perform an eventual garbage + collection of job results. + + Args: + threshold (float | None): If specified, only the job results created + prior to the threshold (a Unix timestamp) are deleted. Otherwise, + all job results are deleted. + + Raises: + AsyncJobClearError: If the operation fails. + + References: + - `delete-async-job-results `__ + """ # noqa: E501 + if threshold is None: + request = Request(method=Method.DELETE, endpoint="/_api/job/all") + else: + request = Request( + method=Method.DELETE, + endpoint="/_api/job/expired", + params={"stamp": threshold}, + ) + + def response_handler(resp: Response) -> None: + if not resp.is_success: + raise AsyncJobClearError(resp, request) + + await self._executor.execute(request, response_handler) + class TransactionDatabase(Database): """Database API tailored specifically for @@ -1420,3 +1507,26 @@ def response_handler(resp: Response) -> None: raise TransactionAbortError(resp, request) await self._standard_executor.execute(request, response_handler) + + +class AsyncDatabase(Database): + """Database API wrapper tailored specifically for async execution. + + See :func:`arangoasync.database.StandardDatabase.begin_async_execution`. + + Args: + connection (Connection): HTTP connection. + return_result (bool): If set to `True`, API executions return instances of + :class:`arangoasync.job.AsyncJob`, which you can use to retrieve results + from server once available. If set to `False`, API executions return `None` + and no results are stored on server. + + References: + - `jobs `__ + """ # noqa: E501 + + def __init__(self, connection: Connection, return_result: bool) -> None: + super().__init__(executor=AsyncApiExecutor(connection, return_result)) + + def __repr__(self) -> str: + return f"" diff --git a/arangoasync/exceptions.py b/arangoasync/exceptions.py index f78b7fb..8e3919f 100644 --- a/arangoasync/exceptions.py +++ b/arangoasync/exceptions.py @@ -135,6 +135,30 @@ class AQLQueryValidateError(ArangoServerError): """Failed to parse and validate query.""" +class AsyncExecuteError(ArangoServerError): + """Failed to execute async API request.""" + + +class AsyncJobCancelError(ArangoServerError): + """Failed to cancel async job.""" + + +class AsyncJobClearError(ArangoServerError): + """Failed to clear async job results.""" + + +class AsyncJobListError(ArangoServerError): + """Failed to retrieve async jobs.""" + + +class AsyncJobResultError(ArangoServerError): + """Failed to retrieve async job result.""" + + +class AsyncJobStatusError(ArangoServerError): + """Failed to retrieve async job status.""" + + class AuthHeaderError(ArangoClientError): """The authentication header could not be determined.""" diff --git a/arangoasync/executor.py b/arangoasync/executor.py index c830be7..b96623d 100644 --- a/arangoasync/executor.py +++ b/arangoasync/executor.py @@ -1,6 +1,8 @@ -from typing import Callable, TypeVar +from typing import Callable, Optional, TypeVar from arangoasync.connection import Connection +from arangoasync.exceptions import AsyncExecuteError +from arangoasync.job import AsyncJob from arangoasync.request import Request from arangoasync.response import Response from arangoasync.serialization import Deserializer, Serializer @@ -95,4 +97,50 @@ async def execute( return response_handler(response) -ApiExecutor = DefaultApiExecutor | TransactionApiExecutor +class AsyncApiExecutor(DefaultApiExecutor): + """Executes asynchronous API requests (jobs). + + Args: + connection: HTTP connection. + return_result: If set to `True`, API executions return instances of + :class:`arangoasync.job.AsyncJob` and results can be retrieved from server + once available. If set to `False`, API executions return `None` and no + results are stored on server. + """ + + def __init__(self, connection: Connection, return_result: bool) -> None: + super().__init__(connection) + self._return_result = return_result + + @property + def context(self) -> str: + return "async" + + async def execute( + self, request: Request, response_handler: Callable[[Response], T] + ) -> Optional[AsyncJob[T]]: + """Execute an API request asynchronously. + + Args: + request: HTTP request. + response_handler: HTTP response handler. + + Returns: `AsyncJob` job or `None` if **return_result** parameter was set to + `False` during initialization. + """ + if self._return_result: + request.headers["x-arango-async"] = "store" + else: + request.headers["x-arango-async"] = "true" + + response = await self._conn.send_request(request) + if not response.is_success: + raise AsyncExecuteError(response, request) + if not self._return_result: + return None + + job_id = response.headers["x-arango-async-id"] + return AsyncJob(self._conn, job_id, response_handler) + + +ApiExecutor = DefaultApiExecutor | TransactionApiExecutor | AsyncApiExecutor diff --git a/arangoasync/job.py b/arangoasync/job.py index eb1be0e..cbfd442 100644 --- a/arangoasync/job.py +++ b/arangoasync/job.py @@ -1,12 +1,212 @@ __all__ = ["AsyncJob"] -from typing import Generic, TypeVar +import asyncio +from typing import Callable, Generic, Optional, TypeVar + +from arangoasync.connection import Connection +from arangoasync.errno import HTTP_NOT_FOUND +from arangoasync.exceptions import ( + AsyncJobCancelError, + AsyncJobClearError, + AsyncJobResultError, + AsyncJobStatusError, +) +from arangoasync.request import Method, Request +from arangoasync.response import Response T = TypeVar("T") class AsyncJob(Generic[T]): - """Job for tracking and retrieving result of an async API execution.""" + """Job for tracking and retrieving result of an async API execution. + + Args: + conn: HTTP connection. + job_id: Async job ID. + response_handler: HTTP response handler + + References: + - `jobs `__ + """ # noqa: E501 + + def __init__( + self, + conn: Connection, + job_id: str, + response_handler: Callable[[Response], T], + ) -> None: + self._conn = conn + self._id = job_id + self._response_handler = response_handler + + def __repr__(self) -> str: + return f"" + + @property + def id(self) -> str: + """Return the async job ID. + + Returns: + str: Async job ID. + """ + return self._id + + async def status(self) -> str: + """Return the async job status from server. + + Once a job result is retrieved via func:`arangoasync.job.AsyncJob.result` + method, it is deleted from server and subsequent status queries will + fail. + + Returns: + str: Async job status. Possible values are "pending" (job is still + in queue), "done" (job finished or raised an error). + + Raises: + ArangoError: If there is a problem with the request. + AsyncJobStatusError: If retrieval fails or the job is not found. + + References: + - `list-async-jobs-by-status-or-get-the-status-of-specific-job `__ + """ # noqa: E501 + request = Request(method=Method.GET, endpoint=f"/_api/job/{self._id}") + response = await self._conn.send_request(request) + + if response.is_success: + if response.status_code == 204: + return "pending" + else: + return "done" + if response.error_code == HTTP_NOT_FOUND: + error_message = f"job {self._id} not found" + raise AsyncJobStatusError(response, request, error_message) + raise AsyncJobStatusError(response, request) + + async def result(self) -> T: + """Fetch the async job result from server. + + If the job raised an exception, it is propagated up at this point. + + Once job result is retrieved, it is deleted from server and subsequent + queries for result will fail. + + Returns: + Async job result. + + Raises: + ArangoError: If the job raised an exception or there was a problem with + the request. + AsyncJobResultError: If retrieval fails. + + References: + - `get-the-results-of-an-async-job `__ + """ # noqa: E501 + request = Request(method=Method.PUT, endpoint=f"/_api/job/{self._id}") + response = await self._conn.send_request(request) + + if ( + "x-arango-async-id" in response.headers + or "X-Arango-Async-Id" in response.headers + ): + # The job result is available on the server + return self._response_handler(response) + + # The job is not known (anymore). + # We can tell the status from the HTTP status code. + if response.status_code == 204: + raise AsyncJobResultError(response, request, self._not_done()) + if response.error_code == HTTP_NOT_FOUND: + raise AsyncJobResultError(response, request, self._not_found()) + raise AsyncJobResultError(response, request) + + async def cancel(self, ignore_missing: bool = False) -> bool: + """Cancel the async job. + + An async job cannot be cancelled once it is taken out of the queue. + + Note: + It still might take some time to actually cancel the running async job. + + Args: + ignore_missing: Do not raise an exception if the job is not found. + + Returns: + `True` if job was cancelled successfully, `False` if the job was not found + but **ignore_missing** was set to `True`. + + Raises: + ArangoError: If there was a problem with the request. + AsyncJobCancelError: If cancellation fails. + + References: + - `cancel-an-async-job `__ + """ # noqa: E501 + request = Request(method=Method.PUT, endpoint=f"/_api/job/{self._id}/cancel") + response = await self._conn.send_request(request) + + if response.is_success: + return True + if response.error_code == HTTP_NOT_FOUND: + if ignore_missing: + return False + raise AsyncJobCancelError(response, request, self._not_found()) + raise AsyncJobCancelError(response, request) + + async def clear( + self, + ignore_missing: bool = False, + ) -> bool: + """Delete the job result from the server. + + Args: + ignore_missing: Do not raise an exception if the job is not found. + + Returns: + `True` if result was deleted successfully, `False` if the job was + not found but **ignore_missing** was set to `True`. + + Raises: + ArangoError: If there was a problem with the request. + AsyncJobClearError: If deletion fails. + + References: + - `delete-async-job-results `__ + """ # noqa: E501 + request = Request(method=Method.DELETE, endpoint=f"/_api/job/{self._id}") + resp = await self._conn.send_request(request) + + if resp.is_success: + return True + if resp.error_code == HTTP_NOT_FOUND: + if ignore_missing: + return False + raise AsyncJobClearError(resp, request, self._not_found()) + raise AsyncJobClearError(resp, request) + + async def wait(self, seconds: Optional[float] = None) -> bool: + """Wait for the async job to finish. + + Args: + seconds: Number of seconds to wait between status checks. If not + provided, the method will wait indefinitely. + + Returns: + `True` if the job is done, `False` if the job is still pending. + """ + while True: + if await self.status() == "done": + return True + if seconds is None: + await asyncio.sleep(1) + else: + seconds -= 1 + if seconds < 0: + return False + await asyncio.sleep(1) + + def _not_found(self) -> str: + return f"job {self._id} not found" - pass + def _not_done(self) -> str: + return f"job {self._id} not done" diff --git a/arangoasync/result.py b/arangoasync/result.py new file mode 100644 index 0000000..271eebd --- /dev/null +++ b/arangoasync/result.py @@ -0,0 +1,9 @@ +__all__ = ["Result"] + +from typing import TypeVar, Union + +from arangoasync.job import AsyncJob + +# The Result definition has to be in a separate module because of circular imports. +T = TypeVar("T") +Result = Union[T, AsyncJob[T], None] diff --git a/arangoasync/typings.py b/arangoasync/typings.py index a24367c..44631f8 100644 --- a/arangoasync/typings.py +++ b/arangoasync/typings.py @@ -9,15 +9,11 @@ MutableMapping, Optional, Tuple, - TypeVar, - Union, cast, ) from multidict import CIMultiDictProxy, MultiDict -from arangoasync.job import AsyncJob - Json = Dict[str, Any] Json.__doc__ = """Type definition for request/response body""" @@ -30,15 +26,12 @@ ResponseHeaders = MutableMapping[str, str] | MultiDict[str] | CIMultiDictProxy[str] ResponseHeaders.__doc__ = """Type definition for response HTTP headers""" -Params = MutableMapping[str, bool | int | str] +Params = MutableMapping[str, bool | int | str | float] Params.__doc__ = """Type definition for URL (query) parameters""" Formatter = Callable[[Json], Json] Formatter.__doc__ = """Type definition for a JSON formatter""" -T = TypeVar("T") -Result = Union[T, AsyncJob[T]] - class CollectionType(Enum): """Collection types.""" diff --git a/docs/specs.rst b/docs/specs.rst index db326e4..2de6ae9 100644 --- a/docs/specs.rst +++ b/docs/specs.rst @@ -19,6 +19,9 @@ python-arango-async. .. automodule:: arangoasync.aql :members: +.. automodule:: arangoasync.job + :members: + .. automodule:: arangoasync.cursor :members: @@ -48,3 +51,6 @@ python-arango-async. .. automodule:: arangoasync.typings :members: + +.. automodule:: arangoasync.result + :members: diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..cadeb2b --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,103 @@ +import asyncio +import time + +import pytest + +from arangoasync.exceptions import ( + AQLQueryExecuteError, + AsyncJobCancelError, + AsyncJobListError, +) + + +@pytest.mark.asyncio +async def test_async_no_result(db, bad_db, doc_col, docs): + # There should be no jobs to begin with + jobs = await db.async_jobs(status="pending") + assert len(jobs) == 0 + with pytest.raises(AsyncJobListError): + await bad_db.async_jobs(status="pending") + + # Create a basic job + async_db = db.begin_async_execution(return_result=False) + async_col = async_db.collection(doc_col.name) + + # Should return None, because return_result=False + job1 = await async_col.insert(docs[0]) + assert job1 is None + time.sleep(1) + # There should be none pending or done + jobs_pending, jobs_done = await asyncio.gather( + db.async_jobs(status="pending"), + db.async_jobs(status="done"), + ) + assert len(jobs_pending) == 0 + assert len(jobs_done) == 0 + + # Create a long-running job + aql = async_db.aql + job2, job3 = await asyncio.gather( + aql.execute("RETURN SLEEP(5)"), aql.execute("RETURN SLEEP(5)") + ) + time.sleep(1) + assert job2 is None + assert job3 is None + jobs_pending, jobs_done = await asyncio.gather( + db.async_jobs(status="pending"), + db.async_jobs(status="done"), + ) + assert len(jobs_pending) == 0 + assert len(jobs_done) == 0 + + with pytest.raises(AsyncJobListError): + await db.async_jobs(status="invalid-parameter") + with pytest.raises(AsyncJobListError): + await bad_db.async_jobs(status="pending") + + +@pytest.mark.asyncio +async def test_async_result(db, bad_db, doc_col, docs): + # There should be no jobs to begin with + jobs = await db.async_jobs(status="pending") + assert len(jobs) == 0 + + # Create a basic job and wait for it to finish + async_db = db.begin_async_execution(return_result=True) + async_col = async_db.collection(doc_col.name) + job1 = await async_col.insert(docs[0]) + await job1.wait() + assert await job1.status() == "done" + res = await job1.result() + assert isinstance(res, dict) + + # Check that exceptions are being propagated correctly + aql = async_db.aql + job2 = await aql.execute("INVALID QUERY") + await job2.wait() + with pytest.raises(AQLQueryExecuteError): + _ = await job2.result() + + # Long-running job + job3 = await aql.execute("RETURN SLEEP(5)") + time.sleep(1) + assert await job3.status() == "pending" + jobs = await db.async_jobs(status="pending") + assert len(jobs) == 1 + await job3.wait() + + # Clear jobs for which result has not been claimed + jobs = await db.async_jobs(status="done") + assert len(jobs) == 1 + await db.clear_async_jobs() + jobs = await db.async_jobs(status="done") + assert len(jobs) == 0 + + # Attempt to cancel a finished job + assert await job3.cancel(ignore_missing=True) is False + with pytest.raises(AsyncJobCancelError): + await job3.cancel() + + # Attempt to clear a single job + job4 = await aql.execute("RETURN 1") + await job4.wait() + await job4.clear() From 6a8ce081301f9725daa58afbec3a1a5fb0432941 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Wed, 26 Feb 2025 13:28:47 +0530 Subject: [PATCH 2/3] New cursor test --- tests/test_cursor.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/test_cursor.py b/tests/test_cursor.py index fd0237f..f05116a 100644 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -270,6 +270,25 @@ async def test_cursor_context_manager(db, doc_col, docs): assert await cursor.close(ignore_missing=True) is False +@pytest.mark.asyncio +async def test_cursor_iteration(db, doc_col, docs): + # Insert documents + await asyncio.gather(*[doc_col.insert(doc) for doc in docs]) + + aql: AQL = db.aql + cursor = await aql.execute( + f"FOR d IN {doc_col.name} SORT d._key RETURN d", + count=True, + batch_size=2, + ttl=1000, + ) + doc_cnt = 0 + async with cursor as ctx: + async for _ in ctx: + doc_cnt += 1 + assert doc_cnt == len(docs) + + @pytest.mark.asyncio async def test_cursor_manual_fetch_and_pop(db, doc_col, docs): # Insert documents From f82a0de2cd92907345e1c022dc3e5ff327749c53 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Wed, 26 Feb 2025 13:31:11 +0530 Subject: [PATCH 3/3] Handling cursors separately --- arangoasync/aql.py | 11 +++++++++-- arangoasync/cursor.py | 4 ++-- arangoasync/database.py | 1 + arangoasync/executor.py | 40 +++++++++++++++++++++++++++++++--------- arangoasync/job.py | 8 +++++--- tests/test_async.py | 32 ++++++++++++++++++++++++++++++++ 6 files changed, 80 insertions(+), 16 deletions(-) diff --git a/arangoasync/aql.py b/arangoasync/aql.py index 3ab03ce..c5a72db 100644 --- a/arangoasync/aql.py +++ b/arangoasync/aql.py @@ -23,7 +23,7 @@ AQLQueryTrackingSetError, AQLQueryValidateError, ) -from arangoasync.executor import ApiExecutor +from arangoasync.executor import ApiExecutor, DefaultApiExecutor, NonAsyncExecutor from arangoasync.request import Method, Request from arangoasync.response import Response from arangoasync.result import Result @@ -326,7 +326,14 @@ async def execute( def response_handler(resp: Response) -> Cursor: if not resp.is_success: raise AQLQueryExecuteError(resp, request) - return Cursor(self._executor, self.deserializer.loads(resp.raw_body)) + if self._executor.context == "async": + # We cannot have a cursor getting back async jobs + executor: NonAsyncExecutor = DefaultApiExecutor( + self._executor.connection + ) + else: + executor = cast(NonAsyncExecutor, self._executor) + return Cursor(executor, self.deserializer.loads(resp.raw_body)) return await self._executor.execute(request, response_handler) diff --git a/arangoasync/cursor.py b/arangoasync/cursor.py index 55ba40a..5339455 100644 --- a/arangoasync/cursor.py +++ b/arangoasync/cursor.py @@ -12,7 +12,7 @@ CursorNextError, CursorStateError, ) -from arangoasync.executor import ApiExecutor +from arangoasync.executor import NonAsyncExecutor from arangoasync.request import Method, Request from arangoasync.response import Response from arangoasync.serialization import Deserializer, Serializer @@ -39,7 +39,7 @@ class Cursor: is created. """ - def __init__(self, executor: ApiExecutor, data: Json) -> None: + def __init__(self, executor: NonAsyncExecutor, data: Json) -> None: self._executor = executor self._cached: Optional[bool] = None self._count: Optional[int] = None diff --git a/arangoasync/database.py b/arangoasync/database.py index a17b1f2..3022cc4 100644 --- a/arangoasync/database.py +++ b/arangoasync/database.py @@ -2,6 +2,7 @@ "Database", "StandardDatabase", "TransactionDatabase", + "AsyncDatabase", ] diff --git a/arangoasync/executor.py b/arangoasync/executor.py index b96623d..a550d65 100644 --- a/arangoasync/executor.py +++ b/arangoasync/executor.py @@ -1,3 +1,11 @@ +__all__ = [ + "ApiExecutor", + "DefaultApiExecutor", + "NonAsyncExecutor", + "TransactionApiExecutor", + "AsyncApiExecutor", +] + from typing import Callable, Optional, TypeVar from arangoasync.connection import Connection @@ -11,10 +19,10 @@ T = TypeVar("T") -class DefaultApiExecutor: - """Default API executor. +class ExecutorContext: + """Base class for API executors. - Responsible for executing requests and handling responses. + Not to be exported publicly. Args: connection: HTTP connection. @@ -27,10 +35,6 @@ def __init__(self, connection: Connection) -> None: def connection(self) -> Connection: return self._conn - @property - def context(self) -> str: - return "default" - @property def db_name(self) -> str: return self._conn.db_name @@ -49,6 +53,23 @@ def serialize(self, data: Json) -> str: def deserialize(self, data: bytes) -> Json: return self.deserializer.loads(data) + +class DefaultApiExecutor(ExecutorContext): + """Default API executor. + + Responsible for executing requests and handling responses. + + Args: + connection: HTTP connection. + """ + + def __init__(self, connection: Connection) -> None: + super().__init__(connection) + + @property + def context(self) -> str: + return "default" + async def execute( self, request: Request, response_handler: Callable[[Response], T] ) -> T: @@ -62,7 +83,7 @@ async def execute( return response_handler(response) -class TransactionApiExecutor(DefaultApiExecutor): +class TransactionApiExecutor(ExecutorContext): """Executes transaction API requests. Args: @@ -97,7 +118,7 @@ async def execute( return response_handler(response) -class AsyncApiExecutor(DefaultApiExecutor): +class AsyncApiExecutor(ExecutorContext): """Executes asynchronous API requests (jobs). Args: @@ -144,3 +165,4 @@ async def execute( ApiExecutor = DefaultApiExecutor | TransactionApiExecutor | AsyncApiExecutor +NonAsyncExecutor = DefaultApiExecutor | TransactionApiExecutor diff --git a/arangoasync/job.py b/arangoasync/job.py index cbfd442..13794fe 100644 --- a/arangoasync/job.py +++ b/arangoasync/job.py @@ -97,7 +97,8 @@ async def result(self) -> T: Raises: ArangoError: If the job raised an exception or there was a problem with the request. - AsyncJobResultError: If retrieval fails. + AsyncJobResultError: If retrieval fails, because job no longer exists or + is still pending. References: - `get-the-results-of-an-async-job `__ @@ -112,10 +113,11 @@ async def result(self) -> T: # The job result is available on the server return self._response_handler(response) - # The job is not known (anymore). - # We can tell the status from the HTTP status code. if response.status_code == 204: + # The job is still in the pending queue or not yet finished. raise AsyncJobResultError(response, request, self._not_done()) + # The job is not known (anymore). + # We can tell the status from the HTTP status code. if response.error_code == HTTP_NOT_FOUND: raise AsyncJobResultError(response, request, self._not_found()) raise AsyncJobResultError(response, request) diff --git a/tests/test_async.py b/tests/test_async.py index cadeb2b..c4f7988 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -7,6 +7,7 @@ AQLQueryExecuteError, AsyncJobCancelError, AsyncJobListError, + AsyncJobResultError, ) @@ -101,3 +102,34 @@ async def test_async_result(db, bad_db, doc_col, docs): job4 = await aql.execute("RETURN 1") await job4.wait() await job4.clear() + + # Attempt to get the result of a pending job + job5 = await aql.execute("RETURN SLEEP(5)") + time.sleep(1) + with pytest.raises(AsyncJobResultError): + _ = await job5.result() + await job5.wait() + + +@pytest.mark.asyncio +async def test_async_cursor(db, doc_col, docs): + # Insert some documents first + await asyncio.gather(*(doc_col.insert(doc) for doc in docs)) + + async_db = db.begin_async_execution() + aql = async_db.aql + job = await aql.execute( + f"FOR d IN {doc_col.name} SORT d._key RETURN d", + count=True, + batch_size=1, + ttl=1000, + ) + await job.wait() + + # Get the cursor. Bear in mind that its underlying executor is async. + doc_cnt = 0 + cursor = await job.result() + async with cursor as ctx: + async for _ in ctx: + doc_cnt += 1 + assert doc_cnt == len(docs)