diff --git a/arangoasync/aql.py b/arangoasync/aql.py index c5a72db..c0e1b29 100644 --- a/arangoasync/aql.py +++ b/arangoasync/aql.py @@ -327,7 +327,7 @@ def response_handler(resp: Response) -> Cursor: if not resp.is_success: raise AQLQueryExecuteError(resp, request) if self._executor.context == "async": - # We cannot have a cursor getting back async jobs + # We cannot have a cursor giving back async jobs executor: NonAsyncExecutor = DefaultApiExecutor( self._executor.connection ) diff --git a/arangoasync/collection.py b/arangoasync/collection.py index 2d4f592..b6bb483 100644 --- a/arangoasync/collection.py +++ b/arangoasync/collection.py @@ -1,26 +1,34 @@ __all__ = ["Collection", "StandardCollection"] -from typing import Generic, List, Optional, Tuple, TypeVar, cast +from typing import Any, Generic, List, Optional, Sequence, Tuple, TypeVar, cast +from arangoasync.cursor import Cursor from arangoasync.errno import ( + DOCUMENT_NOT_FOUND, HTTP_BAD_PARAMETER, HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED, ) from arangoasync.exceptions import ( CollectionPropertiesError, + CollectionTruncateError, + DocumentCountError, + DocumentDeleteError, DocumentGetError, DocumentInsertError, DocumentParseError, + DocumentReplaceError, DocumentRevisionError, + DocumentUpdateError, IndexCreateError, IndexDeleteError, IndexGetError, IndexListError, IndexLoadError, + SortValidationError, ) -from arangoasync.executor import ApiExecutor +from arangoasync.executor import ApiExecutor, DefaultApiExecutor, NonAsyncExecutor from arangoasync.request import Method, Request from arangoasync.response import Response from arangoasync.result import Result @@ -31,11 +39,12 @@ Json, Jsons, Params, + RequestHeaders, ) -T = TypeVar("T") -U = TypeVar("U") -V = TypeVar("V") +T = TypeVar("T") # Serializer type +U = TypeVar("U") # Deserializer loads +V = TypeVar("V") # Deserializer loads_many class Collection(Generic[T, U, V]): @@ -149,6 +158,90 @@ def _prep_from_doc( else: return doc_id, {"If-Match": rev} + def _build_filter_conditions(self, filters: Optional[Json]) -> str: + """Build filter conditions for an AQL query. + + Args: + filters (dict | None): Document filters. + + Returns: + str: The complete AQL filter condition. + """ + if not filters: + return "" + + conditions = [] + for k, v in filters.items(): + field = k if "." in k else f"`{k}`" + conditions.append(f"doc.{field} == {self.serializer.dumps(v)}") + + return "FILTER " + " AND ".join(conditions) + + @staticmethod + def _is_none_or_int(obj: Any) -> bool: + """Check if obj is `None` or a positive integer. + + Args: + obj: Object to check. + + Returns: + bool: `True` if object is `None` or a positive integer. + """ + return obj is None or isinstance(obj, int) and obj >= 0 + + @staticmethod + def _is_none_or_dict(obj: Any) -> bool: + """Check if obj is `None` or a dict. + + Args: + obj: Object to check. + + Returns: + bool: `True` if object is `None` or a dict. + """ + return obj is None or isinstance(obj, dict) + + @staticmethod + def _validate_sort_parameters(sort: Optional[Jsons]) -> None: + """Validate sort parameters for an AQL query. + + Args: + sort (list | None): Document sort parameters. + + Raises: + SortValidationError: If sort parameters are invalid. + """ + if not sort: + return + + for param in sort: + if "sort_by" not in param or "sort_order" not in param: + raise SortValidationError( + "Each sort parameter must have 'sort_by' and 'sort_order'." + ) + if param["sort_order"].upper() not in ["ASC", "DESC"]: + raise SortValidationError("'sort_order' must be either 'ASC' or 'DESC'") + + @staticmethod + def _build_sort_expression(sort: Optional[Jsons]) -> str: + """Build a sort condition for an AQL query. + + Args: + sort (list | None): Document sort parameters. + + Returns: + str: The complete AQL sort condition. + """ + if not sort: + return "" + + sort_chunks = [] + for sort_param in sort: + chunk = f"doc.{sort_param['sort_by']} {sort_param['sort_order']}" + sort_chunks.append(chunk) + + return "SORT " + ", ".join(sort_chunks) + @property def name(self) -> str: """Return the name of the collection. @@ -387,26 +480,86 @@ def response_handler(resp: Response) -> CollectionProperties: return await self._executor.execute(request, response_handler) + async def truncate( + self, + wait_for_sync: Optional[bool] = None, + compact: Optional[bool] = None, + ) -> None: + """Removes all documents, but leaves indexes intact. + + Args: + wait_for_sync (bool | None): If set to `True`, the data is synchronized + to disk before returning from the truncate operation. + compact (bool | None): If set to `True`, the storage engine is told to + start a compaction in order to free up disk space. This can be + resource intensive. If the only intention is to start over with an + empty collection, specify `False`. + + Raises: + CollectionTruncateError: If truncation fails. + + References: + - `truncate-a-collection `__ + """ # noqa: E501 + params: Params = {} + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if compact is not None: + params["compact"] = compact + + request = Request( + method=Method.PUT, + endpoint=f"/_api/collection/{self.name}/truncate", + params=params, + ) + + def response_handler(resp: Response) -> None: + if not resp.is_success: + raise CollectionTruncateError(resp, request) + + await self._executor.execute(request, response_handler) + + async def count(self) -> Result[int]: + """Return the total document count. + + Returns: + int: Total document count. + + Raises: + DocumentCountError: If retrieval fails. + """ + request = Request( + method=Method.GET, endpoint=f"/_api/collection/{self.name}/count" + ) + + def response_handler(resp: Response) -> int: + if resp.is_success: + result: int = self.deserializer.loads(resp.raw_body)["count"] + return result + raise DocumentCountError(resp, request) + + return await self._executor.execute(request, response_handler) + async def get( self, document: str | Json, - rev: Optional[str] = None, - check_rev: bool = True, allow_dirty_read: bool = False, + if_match: Optional[str] = None, + if_none_match: Optional[str] = None, ) -> Result[Optional[U]]: """Return a document. Args: document (str | dict): Document ID, key or body. - Document body must contain the "_id" or "_key" field. - rev (str | None): Expected document revision. Overrides the - value of "_rev" field in **document** if present. - check_rev (bool): If set to True, revision of **document** (if given) - is compared against the revision of target document. + Document body must contain the "_id" or "_key" field. allow_dirty_read (bool): Allow reads from followers in a cluster. + if_match (str | None): The document is returned, if it has the same + revision as the given ETag. + if_none_match (str | None): The document is returned, if it has a + different revision than the given ETag. Returns: - Document or None if not found. + Document or `None` if not found. Raises: DocumentRevisionError: If the revision is incorrect. @@ -415,10 +568,15 @@ async def get( References: - `get-a-document `__ """ # noqa: E501 - handle, headers = self._prep_from_doc(document, rev, check_rev) + handle, _ = self._prep_from_doc(document) + headers: RequestHeaders = {} if allow_dirty_read: headers["x-arango-allow-dirty-read"] = "true" + if if_match is not None: + headers["If-Match"] = if_match + if if_none_match is not None: + headers["If-None-Match"] = if_none_match request = Request( method=Method.GET, @@ -430,7 +588,66 @@ def response_handler(resp: Response) -> Optional[U]: if resp.is_success: return self._doc_deserializer.loads(resp.raw_body) elif resp.status_code == HTTP_NOT_FOUND: - return None + if resp.error_code == DOCUMENT_NOT_FOUND: + return None + else: + raise DocumentGetError(resp, request) + elif resp.status_code == HTTP_PRECONDITION_FAILED: + raise DocumentRevisionError(resp, request) + else: + raise DocumentGetError(resp, request) + + return await self._executor.execute(request, response_handler) + + async def has( + self, + document: str | Json, + allow_dirty_read: bool = False, + if_match: Optional[str] = None, + if_none_match: Optional[str] = None, + ) -> Result[bool]: + """Check if a document exists in the collection. + + Args: + document (str | dict): Document ID, key or body. + Document body must contain the "_id" or "_key" field. + allow_dirty_read (bool): Allow reads from followers in a cluster. + if_match (str | None): The document is returned, if it has the same + revision as the given ETag. + if_none_match (str | None): The document is returned, if it has a + different revision than the given ETag. + + Returns: + `True` if the document exists, `False` otherwise. + + Raises: + DocumentRevisionError: If the revision is incorrect. + DocumentGetError: If retrieval fails. + + References: + - `get-a-document-header `__ + """ # noqa: E501 + handle, _ = self._prep_from_doc(document) + + headers: RequestHeaders = {} + if allow_dirty_read: + headers["x-arango-allow-dirty-read"] = "true" + if if_match is not None: + headers["If-Match"] = if_match + if if_none_match is not None: + headers["If-None-Match"] = if_none_match + + request = Request( + method=Method.HEAD, + endpoint=f"/_api/document/{handle}", + headers=headers, + ) + + def response_handler(resp: Response) -> bool: + if resp.is_success: + return True + elif resp.status_code == HTTP_NOT_FOUND: + return False elif resp.status_code == HTTP_PRECONDITION_FAILED: raise DocumentRevisionError(resp, request) else: @@ -474,15 +691,15 @@ async def insert( retained in the document. Otherwise, they are removed completely. Applies only when **overwrite_mode** is set to "update" (update-insert). - merge_objects (bool | None): If set to True, sub-dictionaries are merged + merge_objects (bool | None): If set to `True`, sub-dictionaries are merged instead of the new one overwriting the old one. Applies only when **overwrite_mode** is set to "update" (update-insert). refill_index_caches (bool | None): Whether to add new entries to in-memory index caches if document insertions affect the edge index or cache-enabled persistent indexes. version_attribute (str | None): Support for simple external versioning to - document operations. Only applicable if **overwrite** is set to true - or **overwriteMode** is set to "update" or "replace". + document operations. Only applicable if **overwrite** is set to `True` + or **overwrite_mode** is set to "update" or "replace". Returns: bool | dict: Document metadata (e.g. document id, key, revision) or `True` @@ -543,3 +760,929 @@ def response_handler(resp: Response) -> bool | Json: raise DocumentInsertError(resp, request, msg) return await self._executor.execute(request, response_handler) + + async def update( + self, + document: T, + ignore_revs: Optional[bool] = None, + wait_for_sync: Optional[bool] = None, + return_new: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + keep_null: Optional[bool] = None, + merge_objects: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + version_attribute: Optional[str] = None, + if_match: Optional[str] = None, + ) -> Result[bool | Json]: + """Insert a new document. + + Args: + document (dict): Partial or full document with the updated values. + It must contain the "_key" or "_id" field. + ignore_revs (bool | None): If set to `True`, the `_rev` attribute in the + document is ignored. If this is set to `False`, then the `_rev` + attribute given in the body document is taken as a precondition. + The document is only updated if the current revision is the one + specified. + wait_for_sync (bool | None): Wait until document has been synced to disk. + return_new (bool | None): Additionally return the complete new document + under the attribute `new` in the result. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. + silent (bool | None): If set to `True`, no document metadata is returned. + This can be used to save resources. + keep_null (bool | None): If the intention is to delete existing attributes + with the patch command, set this parameter to `False`. + merge_objects (bool | None): Controls whether objects (not arrays) are + merged if present in both the existing and the patch document. + If set to `False`, the value in the patch document overwrites the + existing document’s value. If set to `True`, objects are merged. + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document updates affect the edge index + or cache-enabled persistent indexes. + version_attribute (str | None): Support for simple external versioning to + document operations. + if_match (str | None): You can conditionally update a document based on a + target revision id by using the "if-match" HTTP header. + + Returns: + bool | dict: Document metadata (e.g. document id, key, revision) or `True` + if **silent** is set to `True`. + + Raises: + DocumentRevisionError: If precondition was violated. + DocumentUpdateError: If update fails. + + References: + - `update-a-document `__ + """ # noqa: E501 + params: Params = {} + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if return_new is not None: + params["returnNew"] = return_new + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if keep_null is not None: + params["keepNull"] = keep_null + if merge_objects is not None: + params["mergeObjects"] = merge_objects + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + if version_attribute is not None: + params["versionAttribute"] = version_attribute + + headers: RequestHeaders = {} + if if_match is not None: + headers["If-Match"] = if_match + + request = Request( + method=Method.PATCH, + endpoint=f"/_api/document/{self._extract_id(cast(Json, document))}", + params=params, + headers=headers, + data=self._doc_serializer.dumps(document), + ) + + def response_handler(resp: Response) -> bool | Json: + if resp.is_success: + if silent is True: + return True + return self._executor.deserialize(resp.raw_body) + msg: Optional[str] = None + if resp.status_code == HTTP_PRECONDITION_FAILED: + raise DocumentRevisionError(resp, request) + elif resp.status_code == HTTP_NOT_FOUND: + msg = "Document, collection or transaction not found." + raise DocumentUpdateError(resp, request, msg) + + return await self._executor.execute(request, response_handler) + + async def replace( + self, + document: T, + ignore_revs: Optional[bool] = None, + wait_for_sync: Optional[bool] = None, + return_new: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + version_attribute: Optional[str] = None, + if_match: Optional[str] = None, + ) -> Result[bool | Json]: + """Replace a document. + + Args: + document (dict): New document. It must contain the "_key" or "_id" field. + Edge document must also have "_from" and "_to" fields. + ignore_revs (bool | None): If set to `True`, the `_rev` attribute in the + document is ignored. If this is set to `False`, then the `_rev` + attribute given in the body document is taken as a precondition. + The document is only replaced if the current revision is the one + specified. + wait_for_sync (bool | None): Wait until document has been synced to disk. + return_new (bool | None): Additionally return the complete new document + under the attribute `new` in the result. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. + silent (bool | None): If set to `True`, no document metadata is returned. + This can be used to save resources. + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document updates affect the edge index + or cache-enabled persistent indexes. + version_attribute (str | None): Support for simple external versioning to + document operations. + if_match (str | None): You can conditionally replace a document based on a + target revision id by using the "if-match" HTTP header. + + Returns: + bool | dict: Document metadata (e.g. document id, key, revision) or `True` + if **silent** is set to `True`. + + Raises: + DocumentRevisionError: If precondition was violated. + DocumentReplaceError: If replace fails. + + References: + - `replace-a-document `__ + """ # noqa: E501 + params: Params = {} + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if return_new is not None: + params["returnNew"] = return_new + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + if version_attribute is not None: + params["versionAttribute"] = version_attribute + + headers: RequestHeaders = {} + if if_match is not None: + headers["If-Match"] = if_match + + request = Request( + method=Method.PUT, + endpoint=f"/_api/document/{self._extract_id(cast(Json, document))}", + params=params, + headers=headers, + data=self._doc_serializer.dumps(document), + ) + + def response_handler(resp: Response) -> bool | Json: + if resp.is_success: + if silent is True: + return True + return self._executor.deserialize(resp.raw_body) + msg: Optional[str] = None + if resp.status_code == HTTP_PRECONDITION_FAILED: + raise DocumentRevisionError(resp, request) + elif resp.status_code == HTTP_NOT_FOUND: + msg = "Document, collection or transaction not found." + raise DocumentReplaceError(resp, request, msg) + + return await self._executor.execute(request, response_handler) + + async def delete( + self, + document: T, + ignore_revs: Optional[bool] = None, + ignore_missing: bool = False, + wait_for_sync: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + if_match: Optional[str] = None, + ) -> Result[bool | Json]: + """Delete a document. + + Args: + document (dict): Document ID, key or body. The body must contain the + "_key" or "_id" field. + ignore_revs (bool | None): If set to `True`, the `_rev` attribute in the + document is ignored. If this is set to `False`, then the `_rev` + attribute given in the body document is taken as a precondition. + The document is only replaced if the current revision is the one + specified. + ignore_missing (bool): Do not raise an exception on missing document. + This parameter has no effect in transactions where an exception is + always raised on failures. + wait_for_sync (bool | None): Wait until operation has been synced to disk. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. + silent (bool | None): If set to `True`, no document metadata is returned. + This can be used to save resources. + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document updates affect the edge index + or cache-enabled persistent indexes. + if_match (bool | None): You can conditionally remove a document based + on a target revision id by using the "if-match" HTTP header. + + Returns: + bool | dict: Document metadata (e.g. document id, key, revision) or `True` + if **silent** is set to `True` and the document was found. + + Raises: + DocumentRevisionError: If precondition was violated. + DocumentDeleteError: If deletion fails. + + References: + - `remove-a-document `__ + """ # noqa: E501 + params: Params = {} + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + + headers: RequestHeaders = {} + if if_match is not None: + headers["If-Match"] = if_match + + request = Request( + method=Method.DELETE, + endpoint=f"/_api/document/{self._extract_id(cast(Json, document))}", + params=params, + headers=headers, + ) + + def response_handler(resp: Response) -> bool | Json: + if resp.is_success: + if silent is True: + return True + return self._executor.deserialize(resp.raw_body) + msg: Optional[str] = None + if resp.status_code == HTTP_PRECONDITION_FAILED: + raise DocumentRevisionError(resp, request) + elif resp.status_code == HTTP_NOT_FOUND: + if resp.error_code == DOCUMENT_NOT_FOUND and ignore_missing: + return False + msg = "Document, collection or transaction not found." + raise DocumentDeleteError(resp, request, msg) + + return await self._executor.execute(request, response_handler) + + async def get_many( + self, + documents: Sequence[str | T], + allow_dirty_read: Optional[bool] = None, + ignore_revs: Optional[bool] = None, + ) -> Result[V]: + """Return multiple documents ignoring any missing ones. + + Args: + documents (list): List of document IDs, keys or bodies. A search document + must contain at least a value for the `_key` field. A value for `_rev` + may be specified to verify whether the document has the same revision + value, unless `ignoreRevs` is set to false. + allow_dirty_read (bool | None): Allow reads from followers in a cluster. + ignore_revs (bool | None): If set to `True`, the `_rev` attribute in the + document is ignored. If this is set to `False`, then the `_rev` + attribute given in the body document is taken as a precondition. + The document is only replaced if the current revision is the one + specified. + + Returns: + list: List of documents. Missing ones are not included. + + Raises: + DocumentGetError: If retrieval fails. + + References: + - `get-multiple-documents `__ + """ # noqa: E501 + params: Params = {"onlyget": True} + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + + headers: RequestHeaders = {} + if allow_dirty_read is not None: + if allow_dirty_read is True: + headers["x-arango-allow-dirty-read"] = "true" + else: + headers["x-arango-allow-dirty-read"] = "false" + + request = Request( + method=Method.PUT, + endpoint=f"/_api/document/{self.name}", + params=params, + headers=headers, + data=self._doc_serializer.dumps(documents), + ) + + def response_handler(resp: Response) -> V: + if not resp.is_success: + raise DocumentGetError(resp, request) + return self._doc_deserializer.loads_many(resp.raw_body) + + return await self._executor.execute(request, response_handler) + + async def find( + self, + filters: Optional[Json] = None, + skip: Optional[int] = None, + limit: Optional[int | str] = None, + allow_dirty_read: Optional[bool] = False, + sort: Optional[Jsons] = None, + ) -> Result[Cursor]: + """Return all documents that match the given filters. + + Args: + filters (dict | None): Query filters. + skip (int | None): Number of documents to skip. + limit (int | str | None): Maximum number of documents to return. + allow_dirty_read (bool): Allow reads from followers in a cluster. + sort (list | None): Document sort parameters. + + Returns: + Cursor: Document cursor. + + Raises: + DocumentGetError: If retrieval fails. + SortValidationError: If sort parameters are invalid. + """ + if not self._is_none_or_dict(filters): + raise ValueError("filters parameter must be a dict") + self._validate_sort_parameters(sort) + if not self._is_none_or_int(skip): + raise ValueError("skip parameter must be a non-negative int") + if not (self._is_none_or_int(limit) or limit == "null"): + raise ValueError("limit parameter must be a non-negative int") + + skip = skip if skip is not None else 0 + limit = limit if limit is not None else "null" + query = f""" + FOR doc IN @@collection + {self._build_filter_conditions(filters)} + LIMIT {skip}, {limit} + {self._build_sort_expression(sort)} + RETURN doc + """ + bind_vars = {"@collection": self.name} + data: Json = {"query": query, "bindVars": bind_vars, "count": True} + headers: RequestHeaders = {} + if allow_dirty_read is not None: + if allow_dirty_read is True: + headers["x-arango-allow-dirty-read"] = "true" + else: + headers["x-arango-allow-dirty-read"] = "false" + + request = Request( + method=Method.POST, + endpoint="/_api/cursor", + data=self.serializer.dumps(data), + headers=headers, + ) + + def response_handler(resp: Response) -> Cursor: + if not resp.is_success: + raise DocumentGetError(resp, request) + if self._executor.context == "async": + # We cannot have a cursor giving back async jobs + executor: NonAsyncExecutor = DefaultApiExecutor( + self._executor.connection + ) + else: + executor = cast(NonAsyncExecutor, self._executor) + return Cursor(executor, self.deserializer.loads(resp.raw_body)) + + return await self._executor.execute(request, response_handler) + + async def update_match( + self, + filters: Json, + body: T, + limit: Optional[int | str] = None, + keep_none: Optional[bool] = None, + wait_for_sync: Optional[bool] = None, + merge_objects: Optional[bool] = None, + ) -> Result[int]: + """Update matching documents. + + Args: + filters (dict | None): Query filters. + body (dict): Full or partial document body with the updates. + limit (int | str | None): Maximum number of documents to update. + keep_none (bool | None): If set to `True`, fields with value `None` are + retained in the document. Otherwise, they are removed completely. + wait_for_sync (bool | None): Wait until operation has been synced to disk. + merge_objects (bool | None): If set to `True`, sub-dictionaries are merged + instead of the new one overwriting the old one. + + Returns: + int: Number of documents that got updated. + + Raises: + DocumentUpdateError: If update fails. + """ + if not self._is_none_or_dict(filters): + raise ValueError("filters parameter must be a dict") + if not (self._is_none_or_int(limit) or limit == "null"): + raise ValueError("limit parameter must be a non-negative int") + + sync = f", waitForSync: {wait_for_sync}" if wait_for_sync is not None else "" + query = f""" + FOR doc IN @@collection + {self._build_filter_conditions(filters)} + {f"LIMIT {limit}" if limit is not None else ""} + UPDATE doc WITH @body IN @@collection + OPTIONS {{ keepNull: @keep_none, mergeObjects: @merge {sync} }} + """ # noqa: E201 E202 + bind_vars = { + "@collection": self.name, + "body": body, + "keep_none": keep_none, + "merge": merge_objects, + } + data = {"query": query, "bindVars": bind_vars} + + request = Request( + method=Method.POST, + endpoint="/_api/cursor", + data=self.serializer.dumps(data), + ) + + def response_handler(resp: Response) -> int: + if resp.is_success: + result = self.deserializer.loads(resp.raw_body) + return cast(int, result["extra"]["stats"]["writesExecuted"]) + raise DocumentUpdateError(resp, request) + + return await self._executor.execute(request, response_handler) + + async def replace_match( + self, + filters: Json, + body: T, + limit: Optional[int | str] = None, + wait_for_sync: Optional[bool] = None, + ) -> Result[int]: + """Replace matching documents. + + Args: + filters (dict | None): Query filters. + body (dict): New document body. + limit (int | str | None): Maximum number of documents to replace. + wait_for_sync (bool | None): Wait until operation has been synced to disk. + + Returns: + int: Number of documents that got replaced. + + Raises: + DocumentReplaceError: If replace fails. + """ + if not self._is_none_or_dict(filters): + raise ValueError("filters parameter must be a dict") + if not (self._is_none_or_int(limit) or limit == "null"): + raise ValueError("limit parameter must be a non-negative int") + + sync = f"waitForSync: {wait_for_sync}" if wait_for_sync is not None else "" + query = f""" + FOR doc IN @@collection + {self._build_filter_conditions(filters)} + {f"LIMIT {limit}" if limit is not None else ""} + REPLACE doc WITH @body IN @@collection + {f"OPTIONS {{ {sync} }}" if sync else ""} + """ # noqa: E201 E202 + bind_vars = { + "@collection": self.name, + "body": body, + } + data = {"query": query, "bindVars": bind_vars} + + request = Request( + method=Method.POST, + endpoint="/_api/cursor", + data=self.serializer.dumps(data), + ) + + def response_handler(resp: Response) -> int: + if resp.is_success: + result = self.deserializer.loads(resp.raw_body) + return cast(int, result["extra"]["stats"]["writesExecuted"]) + raise DocumentReplaceError(resp, request) + + return await self._executor.execute(request, response_handler) + + async def delete_match( + self, + filters: Json, + limit: Optional[int | str] = None, + wait_for_sync: Optional[bool] = None, + ) -> Result[int]: + """Delete matching documents. + + Args: + filters (dict | None): Query filters. + limit (int | str | None): Maximum number of documents to delete. + wait_for_sync (bool | None): Wait until operation has been synced to disk. + + Returns: + int: Number of documents that got deleted. + + Raises: + DocumentDeleteError: If delete fails. + """ + if not self._is_none_or_dict(filters): + raise ValueError("filters parameter must be a dict") + if not (self._is_none_or_int(limit) or limit == "null"): + raise ValueError("limit parameter must be a non-negative int") + + sync = f"waitForSync: {wait_for_sync}" if wait_for_sync is not None else "" + query = f""" + FOR doc IN @@collection + {self._build_filter_conditions(filters)} + {f"LIMIT {limit}" if limit is not None else ""} + REMOVE doc IN @@collection + {f"OPTIONS {{ {sync} }}" if sync else ""} + """ # noqa: E201 E202 + bind_vars = {"@collection": self.name} + data = {"query": query, "bindVars": bind_vars} + + request = Request( + method=Method.POST, + endpoint="/_api/cursor", + data=self.serializer.dumps(data), + ) + + def response_handler(resp: Response) -> int: + if resp.is_success: + result = self.deserializer.loads(resp.raw_body) + return cast(int, result["extra"]["stats"]["writesExecuted"]) + raise DocumentDeleteError(resp, request) + + return await self._executor.execute(request, response_handler) + + async def insert_many( + self, + documents: Sequence[T], + wait_for_sync: Optional[bool] = None, + return_new: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + overwrite: Optional[bool] = None, + overwrite_mode: Optional[str] = None, + keep_null: Optional[bool] = None, + merge_objects: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + version_attribute: Optional[str] = None, + ) -> Result[Jsons]: + """Insert multiple documents. + + Note: + If inserting a document fails, the exception is not raised but + returned as an object in the "errors" list. It is up to you to + inspect the list to determine which documents were inserted + successfully (returns document metadata) and which were not + (returns exception object). + + Args: + documents (list): Documents to insert. If an item contains the "_key" or + "_id" field, the value is used as the key of the new document + (otherwise it is auto-generated). Any "_rev" field is ignored. + wait_for_sync (bool | None): Wait until documents have been synced to disk. + return_new (bool | None): Additionally return the complete new document + under the attribute `new` in the result. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. Only available if the + `overwrite` option is used. + silent (bool | None): If set to `True`, an empty object is returned as + response if all document operations succeed. No meta-data is returned + for the created documents. If any of the operations raises an error, + an array with the error object(s) is returned. + overwrite (bool | None): If set to `True`, operation does not fail on + duplicate key and existing document is overwritten (replace-insert). + overwrite_mode (str | None): Overwrite mode. Supersedes **overwrite** + option. May be one of "ignore", "replace", "update" or "conflict". + keep_null (bool | None): If set to `True`, fields with value None are + retained in the document. Otherwise, they are removed completely. + Applies only when **overwrite_mode** is set to "update" + (update-insert). + merge_objects (bool | None): If set to `True`, sub-dictionaries are merged + instead of the new one overwriting the old one. Applies only when + **overwrite_mode** is set to "update" (update-insert). + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document operations affect the edge index + or cache-enabled persistent indexes. + version_attribute (str | None): Support for simple external versioning to + document operations. Only applicable if **overwrite** is set to `True` + or **overwrite_mode** is set to "update" or "replace". + + Returns: + list: Documents metadata (e.g. document id, key, revision) and + errors or just errors if **silent** is set to `True`. + + Raises: + DocumentInsertError: If insertion fails. + + References: + - `create-multiple-documents `__ + """ # noqa: E501 + params: Params = {} + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if return_new is not None: + params["returnNew"] = return_new + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if overwrite is not None: + params["overwrite"] = overwrite + if overwrite_mode is not None: + params["overwriteMode"] = overwrite_mode + if keep_null is not None: + params["keepNull"] = keep_null + if merge_objects is not None: + params["mergeObjects"] = merge_objects + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + if version_attribute is not None: + params["versionAttribute"] = version_attribute + + request = Request( + method=Method.POST, + endpoint=f"/_api/document/{self.name}", + data=self._doc_serializer.dumps(documents), + params=params, + ) + + def response_handler( + resp: Response, + ) -> Jsons: + if not resp.is_success: + raise DocumentInsertError(resp, request) + return self.deserializer.loads_many(resp.raw_body) + + return await self._executor.execute(request, response_handler) + + async def replace_many( + self, + documents: Sequence[T], + wait_for_sync: Optional[bool] = None, + ignore_revs: Optional[bool] = None, + return_new: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + version_attribute: Optional[str] = None, + ) -> Result[Jsons]: + """Insert multiple documents. + + Note: + If replacing a document fails, the exception is not raised but + returned as an object in the "errors" list. It is up to you to + inspect the list to determine which documents were replaced + successfully (returns document metadata) and which were not + (returns exception object). + + Args: + documents (list): New documents to replace the old ones. An item must + contain the "_key" or "_id" field. + wait_for_sync (bool | None): Wait until documents have been synced to disk. + ignore_revs (bool | None): If this is set to `False`, then any `_rev` + attribute given in a body document is taken as a precondition. The + document is only replaced if the current revision is the one + specified. + return_new (bool | None): Additionally return the complete new document + under the attribute `new` in the result. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. + silent (bool | None): If set to `True`, an empty object is returned as + response if all document operations succeed. No meta-data is returned + for the created documents. If any of the operations raises an error, + an array with the error object(s) is returned. + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document operations affect the edge index + or cache-enabled persistent indexes. + version_attribute (str | None): Support for simple external versioning to + document operations. + + Returns: + list: Documents metadata (e.g. document id, key, revision) and + errors or just errors if **silent** is set to `True`. + + Raises: + DocumentReplaceError: If replacing fails. + + References: + - `replace-multiple-documents `__ + """ # noqa: E501 + params: Params = {} + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + if return_new is not None: + params["returnNew"] = return_new + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + if version_attribute is not None: + params["versionAttribute"] = version_attribute + + request = Request( + method=Method.PUT, + endpoint=f"/_api/document/{self.name}", + data=self._doc_serializer.dumps(documents), + params=params, + ) + + def response_handler( + resp: Response, + ) -> Jsons: + if not resp.is_success: + raise DocumentReplaceError(resp, request) + return self.deserializer.loads_many(resp.raw_body) + + return await self._executor.execute(request, response_handler) + + async def update_many( + self, + documents: Sequence[T], + wait_for_sync: Optional[bool] = None, + ignore_revs: Optional[bool] = None, + return_new: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + keep_null: Optional[bool] = None, + merge_objects: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + version_attribute: Optional[str] = None, + ) -> Result[Jsons]: + """Insert multiple documents. + + Note: + If updating a document fails, the exception is not raised but + returned as an object in the "errors" list. It is up to you to + inspect the list to determine which documents were updated + successfully (returned as document metadata) and which were not + (returned as exception object). + + Args: + documents (list): Documents to update. An item must contain the "_key" or + "_id" field. + wait_for_sync (bool | None): Wait until documents have been synced to disk. + ignore_revs (bool | None): If this is set to `False`, then any `_rev` + attribute given in a body document is taken as a precondition. The + document is only updated if the current revision is the one + specified. + return_new (bool | None): Additionally return the complete new document + under the attribute `new` in the result. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. + silent (bool | None): If set to `True`, an empty object is returned as + response if all document operations succeed. No meta-data is returned + for the created documents. If any of the operations raises an error, + an array with the error object(s) is returned. + keep_null (bool | None): If set to `True`, fields with value None are + retained in the document. Otherwise, they are removed completely. + Applies only when **overwrite_mode** is set to "update" + (update-insert). + merge_objects (bool | None): If set to `True`, sub-dictionaries are merged + instead of the new one overwriting the old one. Applies only when + **overwrite_mode** is set to "update" (update-insert). + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document operations affect the edge index + or cache-enabled persistent indexes. + version_attribute (str | None): Support for simple external versioning to + document operations. + + Returns: + list: Documents metadata (e.g. document id, key, revision) and + errors or just errors if **silent** is set to `True`. + + Raises: + DocumentUpdateError: If update fails. + + References: + - `update-multiple-documents `__ + """ # noqa: E501 + params: Params = {} + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + if return_new is not None: + params["returnNew"] = return_new + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if keep_null is not None: + params["keepNull"] = keep_null + if merge_objects is not None: + params["mergeObjects"] = merge_objects + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + if version_attribute is not None: + params["versionAttribute"] = version_attribute + + request = Request( + method=Method.PATCH, + endpoint=f"/_api/document/{self.name}", + data=self._doc_serializer.dumps(documents), + params=params, + ) + + def response_handler( + resp: Response, + ) -> Jsons: + if not resp.is_success: + raise DocumentUpdateError(resp, request) + return self.deserializer.loads_many(resp.raw_body) + + return await self._executor.execute(request, response_handler) + + async def delete_many( + self, + documents: Sequence[T], + wait_for_sync: Optional[bool] = None, + ignore_revs: Optional[bool] = None, + return_old: Optional[bool] = None, + silent: Optional[bool] = None, + refill_index_caches: Optional[bool] = None, + ) -> Result[Jsons]: + """Delete multiple documents. + + Note: + If deleting a document fails, the exception is not raised but + returned as an object in the "errors" list. It is up to you to + inspect the list to determine which documents were deleted + successfully (returned as document metadata) and which were not + (returned as exception object). + + Args: + documents (list): Documents to delete. An item must contain the "_key" or + "_id" field. + wait_for_sync (bool | None): Wait until documents have been synced to disk. + ignore_revs (bool | None): If this is set to `False`, then any `_rev` + attribute given in a body document is taken as a precondition. The + document is only updated if the current revision is the one + specified. + return_old (bool | None): Additionally return the complete old document + under the attribute `old` in the result. + silent (bool | None): If set to `True`, an empty object is returned as + response if all document operations succeed. No meta-data is returned + for the created documents. If any of the operations raises an error, + an array with the error object(s) is returned. + refill_index_caches (bool | None): Whether to add new entries to + in-memory index caches if document operations affect the edge index + or cache-enabled persistent indexes. + + Returns: + list: Documents metadata (e.g. document id, key, revision) and + errors or just errors if **silent** is set to `True`. + + Raises: + DocumentRemoveError: If removal fails. + + References: + - `remove-multiple-documents `__ + """ # noqa: E501 + params: Params = {} + if wait_for_sync is not None: + params["waitForSync"] = wait_for_sync + if ignore_revs is not None: + params["ignoreRevs"] = ignore_revs + if return_old is not None: + params["returnOld"] = return_old + if silent is not None: + params["silent"] = silent + if refill_index_caches is not None: + params["refillIndexCaches"] = refill_index_caches + + request = Request( + method=Method.DELETE, + endpoint=f"/_api/document/{self.name}", + data=self._doc_serializer.dumps(documents), + params=params, + ) + + def response_handler( + resp: Response, + ) -> Jsons: + if not resp.is_success: + raise DocumentDeleteError(resp, request) + return self.deserializer.loads_many(resp.raw_body) + + return await self._executor.execute(request, response_handler) diff --git a/arangoasync/exceptions.py b/arangoasync/exceptions.py index 8e3919f..1274df2 100644 --- a/arangoasync/exceptions.py +++ b/arangoasync/exceptions.py @@ -187,6 +187,10 @@ class ClientConnectionError(ArangoClientError): """The request was unable to reach the server.""" +class CollectionTruncateError(ArangoServerError): + """Failed to truncate collection.""" + + class CursorCloseError(ArangoServerError): """Failed to delete the cursor result from server.""" @@ -227,6 +231,14 @@ class DeserializationError(ArangoClientError): """Failed to deserialize the server response.""" +class DocumentCountError(ArangoServerError): + """Failed to retrieve document count.""" + + +class DocumentDeleteError(ArangoServerError): + """Failed to delete document.""" + + class DocumentGetError(ArangoServerError): """Failed to retrieve document.""" @@ -239,10 +251,18 @@ class DocumentParseError(ArangoClientError): """Failed to parse document input.""" +class DocumentReplaceError(ArangoServerError): + """Failed to replace document.""" + + class DocumentRevisionError(ArangoServerError): """The expected and actual document revisions mismatched.""" +class DocumentUpdateError(ArangoServerError): + """Failed to update document.""" + + class IndexCreateError(ArangoServerError): """Failed to create collection index.""" @@ -307,6 +327,10 @@ class ServerVersionError(ArangoServerError): """Failed to retrieve server version.""" +class SortValidationError(ArangoClientError): + """Invalid sort parameters.""" + + class TransactionAbortError(ArangoServerError): """Failed to abort transaction.""" diff --git a/arangoasync/serialization.py b/arangoasync/serialization.py index d17a2cd..f12f81a 100644 --- a/arangoasync/serialization.py +++ b/arangoasync/serialization.py @@ -9,7 +9,7 @@ import json from abc import ABC, abstractmethod -from typing import Generic, TypeVar +from typing import Generic, Sequence, TypeVar from arangoasync.exceptions import DeserializationError, SerializationError from arangoasync.typings import Json, Jsons @@ -26,7 +26,7 @@ class Serializer(ABC, Generic[T]): # pragma: no cover """ @abstractmethod - def dumps(self, data: T) -> str: + def dumps(self, data: T | Sequence[T | str]) -> str: """Serialize any generic data. Args: @@ -87,7 +87,7 @@ def loads_many(self, data: bytes) -> U: class JsonSerializer(Serializer[Json]): """JSON serializer.""" - def dumps(self, data: T) -> str: + def dumps(self, data: Json | Sequence[str | Json]) -> str: try: return json.dumps(data, separators=(",", ":")) except Exception as e: @@ -104,10 +104,7 @@ def loads(self, data: bytes) -> Json: raise DeserializationError("Failed to deserialize data from JSON.") from e def loads_many(self, data: bytes) -> Jsons: - try: - return json.loads(data) # type: ignore[no-any-return] - except Exception as e: - raise DeserializationError("Failed to deserialize data from JSON.") from e + return self.loads(data) # type: ignore[return-value] DefaultSerializer = JsonSerializer diff --git a/tests/test_collection.py b/tests/test_collection.py index 72f6583..d9214dd 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -5,6 +5,8 @@ from arangoasync.errno import DATA_SOURCE_NOT_FOUND, INDEX_NOT_FOUND from arangoasync.exceptions import ( CollectionPropertiesError, + CollectionTruncateError, + DocumentCountError, IndexCreateError, IndexDeleteError, IndexGetError, @@ -157,3 +159,26 @@ async def test_collection_index(doc_col, bad_col, cluster): await doc_col.delete_index(idx1.id) assert err.value.error_code == INDEX_NOT_FOUND assert await doc_col.delete_index(idx2.id, ignore_missing=True) is False + + +@pytest.mark.asyncio +async def test_collection_truncate_count(docs, doc_col, bad_col): + # Test errors + with pytest.raises(CollectionTruncateError): + await bad_col.truncate() + with pytest.raises(DocumentCountError): + await bad_col.count() + + # Test regular operations + await asyncio.gather(*[doc_col.insert(doc) for doc in docs]) + cnt = await doc_col.count() + assert cnt == len(docs) + + await doc_col.truncate() + cnt = await doc_col.count() + assert cnt == 0 + + await asyncio.gather(*[doc_col.insert(doc) for doc in docs]) + await doc_col.truncate(wait_for_sync=True, compact=True) + cnt = await doc_col.count() + assert cnt == 0 diff --git a/tests/test_document.py b/tests/test_document.py index bb49496..ef84101 100644 --- a/tests/test_document.py +++ b/tests/test_document.py @@ -1,11 +1,22 @@ +import asyncio + import pytest -from arangoasync.exceptions import DocumentParseError +from arangoasync.exceptions import ( + DocumentDeleteError, + DocumentGetError, + DocumentInsertError, + DocumentParseError, + DocumentReplaceError, + DocumentRevisionError, + DocumentUpdateError, + SortValidationError, +) from tests.helpers import generate_col_name @pytest.mark.asyncio -async def test_document_insert(doc_col, docs): +async def test_document_insert(doc_col, bad_col, docs): # Test insert document with no key result = await doc_col.insert({}) assert await doc_col.get(result["_key"]) is not None @@ -22,6 +33,9 @@ async def test_document_insert(doc_col, docs): await doc_col.insert({"_id": f"{generate_col_name()}/foo"}) assert "Bad collection name" in err.value.message + with pytest.raises(DocumentInsertError): + await bad_col.insert({}) + # Test insert with default options for doc in docs: result = await doc_col.insert(doc) @@ -29,3 +43,513 @@ async def test_document_insert(doc_col, docs): assert result["_key"] == doc["_key"] assert isinstance(result["_rev"], str) assert (await doc_col.get(doc["_key"]))["val"] == doc["val"] + + +@pytest.mark.asyncio +async def test_document_update(doc_col, bad_col, docs): + # Test updating a non-existent document + with pytest.raises(DocumentUpdateError): + await bad_col.update({"_key": "non-existent", "val": 42}) + + # Verbose update + doc = docs[0] + assert doc["val"] != 42 + await doc_col.insert(doc) + doc["val"] = 42 + updated = await doc_col.update(doc, return_old=True, return_new=True) + assert updated["_key"] == doc["_key"] + assert "old" in updated + assert "new" in updated + new_value = await doc_col.get(doc) + assert new_value["val"] == doc["val"] + + # Silent update + doc["val"] = None + updated = await doc_col.update(doc, silent=True, keep_null=False) + assert updated is True + new_value = await doc_col.get(doc) + assert "val" not in new_value + + # Test wrong revision + with pytest.raises(DocumentRevisionError): + await doc_col.update(new_value, if_match="foobar") + + # Update using correct revision + doc["val"] = 24 + result = await doc_col.update(new_value, silent=True, if_match=new_value["_rev"]) + assert result is True + + +@pytest.mark.asyncio +async def test_document_replace(doc_col, bad_col, docs): + # Test updating a non-existent document + with pytest.raises(DocumentReplaceError): + await bad_col.replace({"_key": "non-existent", "val": 42}) + # Verbose replace + doc = docs[0] + assert doc["val"] != 42 + await doc_col.insert(doc) + doc["val"] = 42 + doc.pop("loc") + doc.pop("text") + replaced = await doc_col.replace(doc, return_old=True, return_new=True) + assert replaced["_key"] == doc["_key"] + new_value = await doc_col.get(doc) + assert new_value["val"] == doc["val"] + assert "text" not in new_value + assert "loc" not in new_value + assert "new" in replaced + assert "old" in replaced + + # Silent replace + doc["text"] = "abcd" + doc["new_entry"] = 3.14 + replaced = await doc_col.replace(doc, silent=True) + assert replaced is True + new_value = await doc_col.get(doc) + assert new_value["text"] == doc["text"] + assert new_value["new_entry"] == doc["new_entry"] + + # Test wrong revision + with pytest.raises(DocumentRevisionError): + await doc_col.replace(new_value, if_match="foobar") + + # Replace using correct revision + doc["foo"] = "foobar" + result = await doc_col.replace(new_value, silent=True, if_match=new_value["_rev"]) + assert result is True + + +@pytest.mark.asyncio +async def test_document_delete(doc_col, bad_col, docs): + # Test deleting a non-existent document + with pytest.raises(DocumentDeleteError): + await bad_col.delete({"_key": "non-existent"}) + deleted = await doc_col.delete({"_key": "non-existent"}, ignore_missing=True) + assert deleted is False + + # Verbose delete + doc = docs[0] + inserted = await doc_col.insert(doc) + deleted = await doc_col.delete(doc, return_old=True) + assert deleted["_key"] == inserted["_key"] + + # Silent delete + await doc_col.insert(doc) + deleted = await doc_col.delete(doc, silent=True, ignore_missing=True) + assert deleted is True + + # Test wrong revision + inserted = await doc_col.insert(doc) + with pytest.raises(DocumentRevisionError): + await doc_col.delete(inserted, if_match="foobar") + + # Delete using correct revision + deleted = await doc_col.delete(doc, silent=True, if_match=inserted["_rev"]) + assert deleted is True + + +@pytest.mark.asyncio +async def test_document_get(doc_col, bad_col, docs): + # Test getting a non-existent document + with pytest.raises(DocumentGetError): + await bad_col.get({"_key": "non-existent"}) + result = await doc_col.get({"_key": "non-existent"}) + assert result is None + + doc = docs[0] + inserted = await doc_col.insert(doc) + result = await doc_col.get(doc) + assert result["_key"] == inserted["_key"] + + # Test with good revision + result = await doc_col.get(inserted["_key"], if_match=inserted["_rev"]) + assert result["_key"] == inserted["_key"] + + # Test with non-matching revision + result = await doc_col.get(inserted["_id"], if_none_match="foobar") + assert result["_key"] == inserted["_key"] + + # Test with incorrect revision + with pytest.raises(DocumentGetError): + await doc_col.get(inserted["_id"], if_none_match=inserted["_rev"]) + with pytest.raises(DocumentRevisionError): + await doc_col.get(inserted["_id"], if_match="foobar") + + +@pytest.mark.asyncio +async def test_document_has(doc_col, bad_col, docs): + # Test getting a non-existent document + result = await bad_col.has({"_key": "non-existent"}) + assert result is False + result = await doc_col.has({"_key": "non-existent"}) + assert result is False + + doc = docs[0] + inserted = await doc_col.insert(doc) + result = await doc_col.has(doc) + assert result is True + + # Test with good revision + result = await doc_col.has(inserted["_key"], if_match=inserted["_rev"]) + assert result is True + + # Test with non-matching revision + result = await doc_col.has(inserted["_id"], if_none_match="foobar") + assert result is True + + # Test with incorrect revision + with pytest.raises(DocumentGetError): + await doc_col.has(inserted["_id"], if_none_match=inserted["_rev"]) + with pytest.raises(DocumentRevisionError): + await doc_col.has(inserted["_id"], if_match="foobar") + + +@pytest.mark.asyncio +async def test_document_get_many(doc_col, bad_col, docs): + # Test with invalid collection + with pytest.raises(DocumentGetError): + await bad_col.get_many(["non-existent"]) + + # Insert all documents first + await asyncio.gather(*[doc_col.insert(doc) for doc in docs]) + + # Test with good keys + many = await doc_col.get_many([doc["_key"] for doc in docs]) + assert len(many) == len(docs) + + # Test with partially good keys + keys = [doc["_key"] for doc in docs] + keys.append("invalid_key") + many = await doc_col.get_many(keys) + assert len(many) == len(keys) + assert "error" in many[-1] + + # Test with full documents + many = await doc_col.get_many(docs) + assert len(many) == len(docs) + + # Revs + bad_rev = many + bad_rev[0]["_rev"] = "foobar" + many = await doc_col.get_many([bad_rev[0]], ignore_revs=True) + assert len(many) == 1 + assert "error" not in many[0] + many = await doc_col.get_many([bad_rev[0]], ignore_revs=False) + assert len(many) == 1 + assert "error" in many[0] + + # Empty list + many = await doc_col.get_many([]) + assert len(many) == 0 + + +@pytest.mark.asyncio +async def test_document_find(doc_col, bad_col, docs): + # Check errors first + with pytest.raises(DocumentGetError): + await bad_col.find() + with pytest.raises(ValueError): + await doc_col.find(limit=-1) + with pytest.raises(ValueError): + await doc_col.find(skip="abcd") + with pytest.raises(ValueError): + await doc_col.find(filters="abcd") + with pytest.raises(SortValidationError): + await doc_col.find(sort="abcd") + with pytest.raises(SortValidationError): + await doc_col.find(sort=[{"x": "text", "sort_order": "ASC"}]) + + # Insert all documents + await asyncio.gather(*[doc_col.insert(doc) for doc in docs]) + + # Empty find + filter_docs = [] + async for doc in await doc_col.find(): + filter_docs.append(doc) + assert len(filter_docs) == len(docs) + + # Test with filter + filter_docs = [] + async for doc in await doc_col.find(filters={"val": 42}): + filter_docs.append(doc) + assert len(filter_docs) == 0 + async for doc in await doc_col.find(filters={"text": "foo"}): + filter_docs.append(doc) + assert len(filter_docs) == 3 + filter_docs = [] + async for doc in await doc_col.find(filters={"text": "foo", "val": 1}): + filter_docs.append(doc) + assert len(filter_docs) == 1 + + # Test with limit + filter_docs = [] + async for doc in await doc_col.find(limit=2): + filter_docs.append(doc) + assert len(filter_docs) == 2 + + # Test with skip + filter_docs = [] + async for doc in await doc_col.find(skip=2, allow_dirty_read=True): + filter_docs.append(doc) + assert len(filter_docs) == len(docs) - 2 + + # Test with sort + filter_docs = [] + async for doc in await doc_col.find( + {}, sort=[{"sort_by": "text", "sort_order": "ASC"}] + ): + filter_docs.append(doc) + + for idx in range(len(filter_docs) - 1): + assert filter_docs[idx]["text"] <= filter_docs[idx + 1]["text"] + + +@pytest.mark.asyncio +async def test_document_insert_many(doc_col, bad_col, docs): + # Check errors + with pytest.raises(DocumentInsertError): + await bad_col.insert_many(docs) + + # Insert all documents + result = await doc_col.insert_many(docs, return_new=True) + assert len(result) == len(docs) + for res in result: + assert "error" not in res + + # Empty list + result = await doc_col.insert_many([]) + assert len(result) == 0 + + # Insert again (should not work due to unique constraint) + result = await doc_col.insert_many(docs) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + # Silent mode + result = await doc_col.insert_many(docs, silent=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + await doc_col.truncate() + result = await doc_col.insert_many(docs, silent=True) + assert len(result) == 0 + + +@pytest.mark.asyncio +async def test_document_replace_many(doc_col, bad_col, docs): + # Check errors + with pytest.raises(DocumentReplaceError): + await bad_col.replace_many(docs) + + # Empty list + result = await doc_col.replace_many([]) + assert len(result) == 0 + + # Replace "not found" documents + result = await doc_col.replace_many(docs, return_new=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + # Replace successfully + result = await doc_col.insert_many(docs, return_new=True) + replacements = [] + for doc in result: + replacements.append({"_key": doc["_key"], "val": 42}) + result = await doc_col.replace_many(replacements, return_new=True) + assert len(result) == len(docs) + for doc in result: + assert doc["new"]["val"] == 42 + assert "text" not in doc["new"] + + # Silent mode + result = await doc_col.replace_many(docs, silent=True) + assert len(result) == 0 + await doc_col.truncate() + result = await doc_col.replace_many(docs, silent=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + +@pytest.mark.asyncio +async def test_document_update_many(doc_col, bad_col, docs): + # Check errors + with pytest.raises(DocumentUpdateError): + await bad_col.update_many(docs) + + # Empty list + result = await doc_col.update_many([]) + assert len(result) == 0 + + # Update "not found" documents + result = await doc_col.update_many(docs, return_new=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + # Update successfully + result = await doc_col.insert_many(docs, return_new=True) + updates = [] + for doc in result: + updates.append({"_key": doc["_key"], "val": 42}) + result = await doc_col.update_many(updates, return_new=True) + assert len(result) == len(docs) + for doc in result: + assert doc["new"]["val"] == 42 + assert "text" in doc["new"] + + # Silent mode + result = await doc_col.update_many(docs, silent=True) + assert len(result) == 0 + await doc_col.truncate() + result = await doc_col.update_many(docs, silent=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + +@pytest.mark.asyncio +async def test_document_delete_many(doc_col, bad_col, docs): + # Check errors + with pytest.raises(DocumentDeleteError): + await bad_col.delete_many(docs) + + # Empty list + result = await doc_col.delete_many([]) + assert len(result) == 0 + + # Delete "not found" documents + result = await doc_col.delete_many(docs, return_old=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + # Delete successfully + result = await doc_col.insert_many(docs, return_new=True) + deleted = [] + for doc in result: + deleted.append({"_key": doc["_key"], "val": 42}) + result = await doc_col.delete_many(deleted, return_old=True) + assert len(result) == len(docs) + + # Wrong and right rev + result = await doc_col.insert_many(docs, return_new=True) + deleted = [result[0]["new"], result[1]["new"]] + deleted[1]["_rev"] = "foobar" + result = await doc_col.delete_many(deleted, ignore_revs=False) + assert "_key" in result[0] + assert "error" in result[1] + + # Silent mode + await doc_col.truncate() + _ = await doc_col.insert_many(docs) + result = await doc_col.delete_many(docs, silent=True) + assert len(result) == 0 + result = await doc_col.delete_many(docs, silent=True) + assert len(result) == len(docs) + for res in result: + assert "error" in res + + +@pytest.mark.asyncio +async def test_document_update_match(doc_col, bad_col, docs): + # Check errors first + with pytest.raises(DocumentUpdateError): + await bad_col.update_match({}, {}) + with pytest.raises(ValueError): + await doc_col.update_match({}, {}, limit=-1) + with pytest.raises(ValueError): + await doc_col.update_match("abcd", {}) + + # Insert all documents + await doc_col.insert_many(docs) + + # Update value for all documents + count = await doc_col.update_match({}, {"val": 42}) + async for doc in await doc_col.find(): + assert doc["val"] == 42 + assert count == len(docs) + + # Update documents partially + count = await doc_col.update_match({"text": "foo"}, {"val": 24}) + async for doc in await doc_col.find(): + if doc["text"] == "foo": + assert doc["val"] == 24 + assert count == sum([1 for doc in docs if doc["text"] == "foo"]) + + # No matching documents + count = await doc_col.update_match({"text": "no_matching"}, {"val": -1}) + async for doc in await doc_col.find(): + assert doc["val"] != -1 + assert count == 0 + + +@pytest.mark.asyncio +async def test_document_replace_match(doc_col, bad_col, docs): + # Check errors first + with pytest.raises(DocumentReplaceError): + await bad_col.replace_match({}, {}) + with pytest.raises(ValueError): + await doc_col.replace_match({}, {}, limit=-1) + with pytest.raises(ValueError): + await doc_col.replace_match("abcd", {}) + + # Replace all documents + await doc_col.insert_many(docs) + count = await doc_col.replace_match({}, {"replacement": 42}) + async for doc in await doc_col.find(): + assert "replacement" in doc + assert "val" not in doc + assert count == len(docs) + await doc_col.truncate() + + # Replace documents partially + await doc_col.insert_many(docs) + count = await doc_col.replace_match({"text": "foo"}, {"replacement": 24}) + async for doc in await doc_col.find(): + if doc.get("text") == "bar": + assert "replacement" not in doc + else: + assert "replacement" in doc + assert count == sum([1 for doc in docs if doc["text"] == "foo"]) + await doc_col.truncate() + + # No matching documents + await doc_col.insert_many(docs) + count = await doc_col.replace_match({"text": "no_matching"}, {"val": -1}) + async for doc in await doc_col.find(): + assert doc["val"] != -1 + assert count == 0 + + +@pytest.mark.asyncio +async def test_document_delete_match(doc_col, bad_col, docs): + # Check errors first + with pytest.raises(DocumentDeleteError): + await bad_col.delete_match({}) + with pytest.raises(ValueError): + await doc_col.delete_match({}, limit=-1) + with pytest.raises(ValueError): + await doc_col.delete_match("abcd") + + # Delete all documents + await doc_col.insert_many(docs) + count = await doc_col.delete_match({}) + assert count == len(docs) + assert await doc_col.count() == 0 + + # Delete documents partially + await doc_col.insert_many(docs) + count = await doc_col.delete_match({"text": "foo"}) + async for doc in await doc_col.find(): + assert doc["text"] != "foo" + assert count == sum([1 for doc in docs if doc["text"] == "foo"]) + await doc_col.truncate() + + # No matching documents + await doc_col.insert_many(docs) + count = await doc_col.delete_match({"text": "no_matching"}) + assert count == 0