Skip to content

3.10: Read from Followers (allow dirty read) #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion arango/aql.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def execute(
skip_inaccessible_cols: Optional[bool] = None,
max_runtime: Optional[Number] = None,
fill_block_cache: Optional[bool] = None,
allow_dirty_read: bool = False,
) -> Result[Cursor]:
"""Execute the query and return the result cursor.

Expand Down Expand Up @@ -358,6 +359,8 @@ def execute(
query will not make it into the RocksDB block cache if not already
in there, thus leaving more room for the actual hot set.
:type fill_block_cache: bool
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
Copy link
Contributor

@joowani joowani Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: allow_dirty_read: bool. Same for others.

:return: Result cursor.
:rtype: arango.cursor.Cursor
:raise arango.exceptions.AQLQueryExecuteError: If execute fails.
Expand Down Expand Up @@ -408,7 +411,12 @@ def execute(
data["options"] = options
data.update(options)

request = Request(method="post", endpoint="/_api/cursor", data=data)
request = Request(
method="post",
endpoint="/_api/cursor",
data=data,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Cursor:
if not resp.is_success:
Expand Down
55 changes: 51 additions & 4 deletions arango/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def has(
document: Union[str, Json],
rev: Optional[str] = None,
check_rev: bool = True,
allow_dirty_read: bool = False,
) -> Result[bool]:
"""Check if a document exists in the collection.

Expand All @@ -523,13 +524,18 @@ def has(
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: True if document exists, False otherwise.
:rtype: bool
:raise arango.exceptions.DocumentInError: If check fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
handle, body, headers = self._prep_from_doc(document, rev, check_rev)

if allow_dirty_read:
headers["x-arango-allow-dirty-read"] = "true"

request = Request(
method="get",
endpoint=f"/_api/document/{handle}",
Expand Down Expand Up @@ -662,7 +668,11 @@ def response_handler(resp: Response) -> Cursor:
return self._execute(request, response_handler)

def find_near(
self, latitude: Number, longitude: Number, limit: Optional[int] = None
self,
latitude: Number,
longitude: Number,
limit: Optional[int] = None,
allow_dirty_read: bool = False,
) -> Result[Cursor]:
"""Return documents near a given coordinate.

Expand All @@ -677,6 +687,8 @@ def find_near(
:type longitude: int | float
:param limit: Max number of documents returned.
:type limit: int | None
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:returns: Document cursor.
:rtype: arango.cursor.Cursor
:raises arango.exceptions.DocumentGetError: If retrieval fails.
Expand Down Expand Up @@ -705,6 +717,7 @@ def find_near(
endpoint="/_api/cursor",
data={"query": query, "bindVars": bind_vars, "count": True},
read=self.name,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Cursor:
Expand All @@ -721,6 +734,7 @@ def find_in_range(
upper: int,
skip: Optional[int] = None,
limit: Optional[int] = None,
allow_dirty_read: bool = False,
) -> Result[Cursor]:
"""Return documents within a given range in a random order.

Expand All @@ -736,6 +750,8 @@ def find_in_range(
:type skip: int | None
:param limit: Max number of documents returned.
:type limit: int | None
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:returns: Document cursor.
:rtype: arango.cursor.Cursor
:raises arango.exceptions.DocumentGetError: If retrieval fails.
Expand Down Expand Up @@ -764,6 +780,7 @@ def find_in_range(
endpoint="/_api/cursor",
data={"query": query, "bindVars": bind_vars, "count": True},
read=self.name,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Cursor:
Expand All @@ -779,6 +796,7 @@ def find_in_radius(
longitude: Number,
radius: Number,
distance_field: Optional[str] = None,
allow_dirty_read: bool = False,
) -> Result[Cursor]:
"""Return documents within a given radius around a coordinate.

Expand All @@ -793,6 +811,8 @@ def find_in_radius(
:param distance_field: Document field used to indicate the distance to
the given coordinate. This parameter is ignored in transactions.
:type distance_field: str
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:returns: Document cursor.
:rtype: arango.cursor.Cursor
:raises arango.exceptions.DocumentGetError: If retrieval fails.
Expand Down Expand Up @@ -823,6 +843,7 @@ def find_in_radius(
endpoint="/_api/cursor",
data={"query": query, "bindVars": bind_vars, "count": True},
read=self.name,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Cursor:
Expand Down Expand Up @@ -899,7 +920,11 @@ def response_handler(resp: Response) -> Cursor:
return self._execute(request, response_handler)

def find_by_text(
self, field: str, query: str, limit: Optional[int] = None
self,
field: str,
query: str,
limit: Optional[int] = None,
allow_dirty_read: bool = False,
) -> Result[Cursor]:
"""Return documents that match the given fulltext query.

Expand All @@ -909,6 +934,8 @@ def find_by_text(
:type query: str
:param limit: Max number of documents returned.
:type limit: int | None
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:returns: Document cursor.
:rtype: arango.cursor.Cursor
:raises arango.exceptions.DocumentGetError: If retrieval fails.
Expand All @@ -935,6 +962,7 @@ def find_by_text(
endpoint="/_api/cursor",
data={"query": aql, "bindVars": bind_vars, "count": True},
read=self.name,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Cursor:
Expand All @@ -944,12 +972,18 @@ def response_handler(resp: Response) -> Cursor:

return self._execute(request, response_handler)

def get_many(self, documents: Sequence[Union[str, Json]]) -> Result[List[Json]]:
def get_many(
self,
documents: Sequence[Union[str, Json]],
allow_dirty_read: bool = False,
) -> Result[List[Json]]:
"""Return multiple documents ignoring any missing ones.

:param documents: List of document keys, IDs or bodies. Document bodies
must contain the "_id" or "_key" fields.
:type documents: [str | dict]
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: Documents. Missing ones are not included.
:rtype: [dict]
:raise arango.exceptions.DocumentGetError: If retrieval fails.
Expand All @@ -964,6 +998,7 @@ def get_many(self, documents: Sequence[Union[str, Json]]) -> Result[List[Json]]:
params=params,
data=handles,
read=self.name,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> List[Json]:
Expand Down Expand Up @@ -2054,6 +2089,7 @@ def get(
document: Union[str, Json],
rev: Optional[str] = None,
check_rev: bool = True,
allow_dirty_read: bool = False,
) -> Result[Optional[Json]]:
"""Return a document.

Expand All @@ -2066,13 +2102,18 @@ def get(
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: Document, or None if not found.
:rtype: dict | None
:raise arango.exceptions.DocumentGetError: If retrieval fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
handle, body, headers = self._prep_from_doc(document, rev, check_rev)

if allow_dirty_read:
headers["x-arango-allow-dirty-read"] = "true"

request = Request(
method="get",
endpoint=f"/_api/document/{handle}",
Expand Down Expand Up @@ -3075,7 +3116,10 @@ def link(
return self.insert(edge, sync=sync, silent=silent, return_new=return_new)

def edges(
self, vertex: Union[str, Json], direction: Optional[str] = None
self,
vertex: Union[str, Json],
direction: Optional[str] = None,
allow_dirty_read: bool = False,
) -> Result[Json]:
"""Return the edge documents coming in and/or out of the vertex.

Expand All @@ -3084,6 +3128,8 @@ def edges(
:param direction: The direction of the edges. Allowed values are "in"
and "out". If not set, edges in both directions are returned.
:type direction: str
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: List of edges and statistics.
:rtype: dict
:raise arango.exceptions.EdgeListError: If retrieval fails.
Expand All @@ -3097,6 +3143,7 @@ def edges(
endpoint=f"/_api/edges/{self.name}",
params=params,
read=self.name,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Json:
Expand Down
10 changes: 9 additions & 1 deletion arango/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def execute_transaction(
allow_implicit: Optional[bool] = None,
intermediate_commit_count: Optional[int] = None,
intermediate_commit_size: Optional[int] = None,
allow_dirty_read: bool = False,
) -> Result[Any]:
"""Execute raw Javascript command in transaction.

Expand Down Expand Up @@ -256,6 +257,8 @@ def execute_transaction(
:param intermediate_commit_size: Max size of operations in bytes after
which an intermediate commit is performed automatically.
:type intermediate_commit_size: int | None
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: Return value of **command**.
:rtype: Any
:raise arango.exceptions.TransactionExecuteError: If execution fails.
Expand All @@ -282,7 +285,12 @@ def execute_transaction(
if intermediate_commit_size is not None:
data["intermediateCommitSize"] = intermediate_commit_size

request = Request(method="post", endpoint="/_api/transaction", data=data)
request = Request(
method="post",
endpoint="/_api/transaction",
data=data,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)

def response_handler(resp: Response) -> Any:
if not resp.is_success:
Expand Down
21 changes: 19 additions & 2 deletions arango/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class TransactionApiExecutor:
:type lock_timeout: int
:param max_size: Max transaction size in bytes.
:type max_size: int
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
"""

def __init__(
Expand All @@ -305,6 +307,7 @@ def __init__(
allow_implicit: Optional[bool] = None,
lock_timeout: Optional[int] = None,
max_size: Optional[int] = None,
allow_dirty_read: bool = False,
) -> None:
self._conn = connection

Expand All @@ -326,7 +329,12 @@ def __init__(
if max_size is not None:
data["maxTransactionSize"] = max_size

request = Request(method="post", endpoint="/_api/transaction/begin", data=data)
request = Request(
method="post",
endpoint="/_api/transaction/begin",
data=data,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)
resp = self._conn.send_request(request)

if not resp.is_success:
Expand All @@ -348,16 +356,25 @@ def id(self) -> str:
"""
return self._id

def execute(self, request: Request, response_handler: Callable[[Response], T]) -> T:
def execute(
self,
request: Request,
response_handler: Callable[[Response], T],
allow_dirty_read: bool = False,
) -> T:
"""Execute API request in a transaction and return the result.

:param request: HTTP request.
:type request: arango.request.Request
:param response_handler: HTTP response handler.
:type response_handler: callable
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: API execution result.
"""
request.headers["x-arango-trx-id"] = self._id
if allow_dirty_read:
request.headers["x-arango-allow-dirty-read"] = "true"
resp = self._conn.send_request(request)
return response_handler(resp)

Expand Down