diff --git a/arangoasync/database.py b/arangoasync/database.py index 8ba7c62..c6f29e2 100644 --- a/arangoasync/database.py +++ b/arangoasync/database.py @@ -5,7 +5,8 @@ ] -from typing import List, Optional, Sequence, TypeVar, cast +from typing import Any, List, Optional, Sequence, TypeVar, cast +from warnings import warn from arangoasync.collection import StandardCollection from arangoasync.connection import Connection @@ -27,6 +28,7 @@ ServerStatusError, TransactionAbortError, TransactionCommitError, + TransactionExecuteError, TransactionInitError, TransactionListError, TransactionStatusError, @@ -1079,6 +1081,105 @@ def response_handler(resp: Response) -> Json: return await self._executor.execute(request, response_handler) + async def list_transactions(self) -> Result[Jsons]: + """List all currently running stream transactions. + + Returns: + list: List of transactions, with each transaction containing + an "id" and a "state" field. + + Raises: + TransactionListError: If the operation fails on the server side. + """ + request = Request(method=Method.GET, endpoint="/_api/transaction") + + def response_handler(resp: Response) -> Jsons: + if not resp.is_success: + raise TransactionListError(resp, request) + result: Json = self.deserializer.loads(resp.raw_body) + return cast(Jsons, result["transactions"]) + + return await self._executor.execute(request, response_handler) + + async def execute_transaction( + self, + command: str, + params: Optional[Json] = None, + read: Optional[str | Sequence[str]] = None, + write: Optional[str | Sequence[str]] = None, + exclusive: Optional[str | Sequence[str]] = None, + allow_implicit: Optional[bool] = None, + wait_for_sync: Optional[bool] = None, + lock_timeout: Optional[int] = None, + max_transaction_size: Optional[int] = None, + ) -> Result[Any]: + """Execute a JavaScript Transaction. + + Warning: + JavaScript Transactions are deprecated from ArangoDB v3.12.0 onward and + will be removed in a future version. + + Args: + command (str): The actual transaction operations to be executed, in the + form of stringified JavaScript code. + params (dict): Optional parameters passed into the JavaScript command. + read (str | list | None): Name(s) of collections read during transaction. + write (str | list | None): Name(s) of collections written to during + transaction with shared access. + exclusive (str | list | None): Name(s) of collections written to during + transaction with exclusive access. + allow_implicit (bool | None): Allow reading from undeclared collections. + wait_for_sync (bool | None): If `True`, will force the transaction to write + all data to disk before returning. + lock_timeout (int | None): Timeout for waiting on collection locks. Setting + it to 0 will prevent ArangoDB from timing out while waiting for a lock. + max_transaction_size (int | None): Transaction size limit in bytes. + + Returns: + Any: Result of the transaction. + + Raises: + TransactionExecuteError: If the operation fails on the server side. + + References: + - `execute-a-javascript-transaction `__ + """ # noqa: 501 + m = "JavaScript Transactions are deprecated from ArangoDB v3.12.0 onward and will be removed in a future version." # noqa: E501 + warn(m, DeprecationWarning, stacklevel=2) + + collections = dict() + if read is not None: + collections["read"] = read + if write is not None: + collections["write"] = write + if exclusive is not None: + collections["exclusive"] = exclusive + + data: Json = dict(collections=collections, action=command) + if params is not None: + data["params"] = params + if wait_for_sync is not None: + data["waitForSync"] = wait_for_sync + if allow_implicit is not None: + data["allowImplicit"] = allow_implicit + if lock_timeout is not None: + data["lockTimeout"] = lock_timeout + if max_transaction_size is not None: + data["maxTransactionSize"] = max_transaction_size + + request = Request( + method=Method.POST, + endpoint="/_api/transaction", + data=self.serializer.dumps(data), + ) + + def response_handler(resp: Response) -> Any: + if not resp.is_success: + raise TransactionExecuteError(resp, request) + return self.deserializer.loads(resp.raw_body)["result"] + + return await self._executor.execute(request, response_handler) + class StandardDatabase(Database): """Standard database API wrapper. @@ -1119,7 +1220,7 @@ async def begin_transaction( all data to disk before returning allow_implicit (bool | None): Allow reading from undeclared collections. lock_timeout (int | None): Timeout for waiting on collection locks. Setting - it to 0 will make ArangoDB not time out waiting for a lock. + it to 0 will prevent ArangoDB from timing out while waiting for a lock. max_transaction_size (int | None): Transaction size limit in bytes. allow_dirty_read (bool | None): If `True`, allows the Coordinator to ask any shard replica for the data, not only the shard leader. This may result @@ -1135,7 +1236,10 @@ async def begin_transaction( Raises: TransactionInitError: If the operation fails on the server side. - """ + + References: + - `begin-a-stream-transaction `__ + """ # noqa: E501 collections = dict() if read is not None: collections["read"] = read @@ -1188,26 +1292,6 @@ def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase": """ return TransactionDatabase(self.connection, transaction_id) - async def list_transactions(self) -> Result[Jsons]: - """List all currently running stream transactions. - - Returns: - list: List of transactions, with each transaction containing - an "id" and a "state" field. - - Raises: - TransactionListError: If the operation fails on the server side. - """ - request = Request(method=Method.GET, endpoint="/_api/transaction") - - def response_handler(resp: Response) -> Jsons: - if not resp.is_success: - raise TransactionListError(resp, request) - result: Json = self.deserializer.loads(resp.raw_body) - return cast(Jsons, result["transactions"]) - - return await self._executor.execute(request, response_handler) - class TransactionDatabase(Database): """Database API tailored specifically for @@ -1244,7 +1328,10 @@ async def transaction_status(self) -> str: Raises: TransactionStatusError: If the transaction is not found. - """ + + References: + - `get-the-status-of-a-stream-transaction `__ + """ # noqa: E501 request = Request( method=Method.GET, endpoint=f"/_api/transaction/{self.transaction_id}", @@ -1263,7 +1350,10 @@ async def commit_transaction(self) -> None: Raises: TransactionCommitError: If the operation fails on the server side. - """ + + References: + - `commit-a-stream-transaction `__ + """ # noqa: E501 request = Request( method=Method.PUT, endpoint=f"/_api/transaction/{self.transaction_id}", @@ -1276,7 +1366,14 @@ def response_handler(resp: Response) -> None: await self._executor.execute(request, response_handler) async def abort_transaction(self) -> None: - """Abort the transaction.""" + """Abort the transaction. + + Raises: + TransactionAbortError: If the operation fails on the server side. + + References: + - `abort-a-stream-transaction `__ + """ # noqa: E501 request = Request( method=Method.DELETE, endpoint=f"/_api/transaction/{self.transaction_id}", diff --git a/arangoasync/exceptions.py b/arangoasync/exceptions.py index 00e668b..a65a13e 100644 --- a/arangoasync/exceptions.py +++ b/arangoasync/exceptions.py @@ -203,6 +203,10 @@ class TransactionCommitError(ArangoServerError): """Failed to commit transaction.""" +class TransactionExecuteError(ArangoServerError): + """Failed to execute JavaScript transaction.""" + + class TransactionInitError(ArangoServerError): """Failed to initialize transaction.""" diff --git a/tests/test_transaction.py b/tests/test_transaction.py index e8730fd..f7d7f76 100644 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -7,11 +7,45 @@ from arangoasync.exceptions import ( TransactionAbortError, TransactionCommitError, + TransactionExecuteError, TransactionInitError, TransactionStatusError, ) +@pytest.mark.asyncio +async def test_transaction_execute_raw(db, doc_col, docs): + # Test a valid JS transaction + doc = docs[0] + key = doc["_key"] + command = f""" + function (params) {{ + var db = require('internal').db; + db.{doc_col.name}.save({{'_key': params.key, 'val': 1}}); + return true; + }} + """ # noqa: E702 E231 E272 E202 + result = await db.execute_transaction( + command=command, + params={"key": key}, + write=[doc_col.name], + read=[doc_col.name], + exclusive=[doc_col.name], + wait_for_sync=False, + lock_timeout=1000, + max_transaction_size=100000, + allow_implicit=True, + ) + assert result is True + doc = await doc_col.get(key) + assert doc is not None and doc["val"] == 1 + + # Test an invalid transaction + with pytest.raises(TransactionExecuteError) as err: + await db.execute_transaction(command="INVALID COMMAND") + assert err.value.error_code == BAD_PARAMETER + + @pytest.mark.asyncio async def test_transaction_document_insert(db, bad_db, doc_col, docs): # Start a basic transaction