From 0f0a75cc17d07915641843062188b86a6090b7bf Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Wed, 15 Sep 2021 12:34:53 +0200 Subject: [PATCH 1/3] Send BEGIN eagerly on transaction begin --- neo4j/io/__init__.py | 2 ++ neo4j/io/_common.py | 37 +++++++++++++++++++++++++++++ neo4j/work/result.py | 49 ++++----------------------------------- neo4j/work/transaction.py | 15 +++++++----- 4 files changed, 52 insertions(+), 51 deletions(-) diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 65f120cc..6aabeea8 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -29,6 +29,7 @@ __all__ = [ "Bolt", "BoltPool", + "ConnectionErrorHandler", "Neo4jPool", "check_supported_server_product", ] @@ -92,6 +93,7 @@ ) from neo4j.io._common import ( CommitResponse, + ConnectionErrorHandler, Inbox, InitResponse, Outbox, diff --git a/neo4j/io/_common.py b/neo4j/io/_common.py index 0dc8b2a3..f92fd19b 100644 --- a/neo4j/io/_common.py +++ b/neo4j/io/_common.py @@ -139,6 +139,43 @@ def view(self): return memoryview(self._data[:end]) +class ConnectionErrorHandler: + """ + Wrapper class for handling connection errors. + + The class will wrap each method to invoke a callback if the method raises + Neo4jError, SessionExpired, or ServiceUnavailable. + The error will be re-raised after the callback. + """ + + def __init__(self, connection, on_error): + """ + :param connection the connection object to warp + :type connection Bolt + :param on_error the function to be called when a method of + connection raises of of the caught errors. + :type on_error callable + """ + self.__connection = connection + self.__on_error = on_error + + def __getattr__(self, item): + connection_attr = getattr(self.__connection, item) + if not callable(connection_attr): + return connection_attr + + def outer(func): + def inner(*args, **kwargs): + try: + func(*args, **kwargs) + except (Neo4jError, ServiceUnavailable, SessionExpired) as exc: + self.__on_error(exc) + raise + return inner + + return outer(connection_attr) + + class Response: """ Subscriber object for a full response (zero or more detail messages followed by one summary message). diff --git a/neo4j/work/result.py b/neo4j/work/result.py index de024660..bd183452 100644 --- a/neo4j/work/result.py +++ b/neo4j/work/result.py @@ -23,51 +23,10 @@ from warnings import warn from neo4j.data import DataDehydrator -from neo4j.exceptions import ( - Neo4jError, - ServiceUnavailable, - SessionExpired, -) +from neo4j.io import ConnectionErrorHandler from neo4j.work.summary import ResultSummary -class _ConnectionErrorHandler: - """ - Wrapper class for handling connection errors. - - The class will wrap each method to invoke a callback if the method raises - SessionExpired or ServiceUnavailable. - The error will be re-raised after the callback. - """ - - def __init__(self, connection, on_error): - """ - :param connection the connection object to warp - :type connection Bolt - :param on_error the function to be called when a method of - connection raises of of the caught errors. - :type on_error callable - """ - self.connection = connection - self.on_error = on_error - - def __getattr__(self, item): - connection_attr = getattr(self.connection, item) - if not callable(connection_attr): - return connection_attr - - def outer(func): - def inner(*args, **kwargs): - try: - func(*args, **kwargs) - except (Neo4jError, ServiceUnavailable, SessionExpired) as exc: - self.on_error(exc) - raise - return inner - - return outer(connection_attr) - - class Result: """A handler for the result of Cypher query execution. Instances of this class are typically constructed and returned by @@ -76,7 +35,7 @@ class Result: def __init__(self, connection, hydrant, fetch_size, on_closed, on_error): - self._connection = _ConnectionErrorHandler(connection, on_error) + self._connection = ConnectionErrorHandler(connection, on_error) self._hydrant = hydrant self._on_closed = on_closed self._metadata = None @@ -98,7 +57,7 @@ def __init__(self, connection, hydrant, fetch_size, on_closed, @property def _qid(self): - if self._raw_qid == self._connection.connection.most_recent_qid: + if self._raw_qid == self._connection.most_recent_qid: return -1 else: return self._raw_qid @@ -127,7 +86,7 @@ def on_attached(metadata): # For auto-commit there is no qid and Bolt 3 does not support qid self._raw_qid = metadata.get("qid", -1) if self._raw_qid != -1: - self._connection.connection.most_recent_qid = self._raw_qid + self._connection.most_recent_qid = self._raw_qid self._keys = metadata.get("fields") self._attached = True diff --git a/neo4j/work/transaction.py b/neo4j/work/transaction.py index 6cd7c60e..4992bc76 100644 --- a/neo4j/work/transaction.py +++ b/neo4j/work/transaction.py @@ -22,10 +22,9 @@ from neo4j.work.result import Result from neo4j.data import DataHydrator from neo4j.exceptions import ( - ServiceUnavailable, - SessionExpired, TransactionError, ) +from neo4j.io import ConnectionErrorHandler class Transaction: @@ -41,6 +40,9 @@ class Transaction: def __init__(self, connection, fetch_size, on_closed, on_error): self._connection = connection + self._error_handling_connection = ConnectionErrorHandler( + connection, self._error_handler + ) self._bookmark = None self._results = [] self._closed = False @@ -63,14 +65,15 @@ def __exit__(self, exception_type, exception_value, traceback): def _begin(self, database, bookmarks, access_mode, metadata, timeout): self._connection.begin(bookmarks=bookmarks, metadata=metadata, timeout=timeout, mode=access_mode, db=database) + self._error_handling_connection.send_all() + self._error_handling_connection.fetch_all() def _result_on_closed_handler(self): pass - def _result_on_error_handler(self, exc): + def _error_handler(self, exc): self._last_error = exc - if isinstance(exc, (ServiceUnavailable, SessionExpired)): - self._closed = True + self._closed = True self._on_error(exc) def _consume_results(self): @@ -126,7 +129,7 @@ def run(self, query, parameters=None, **kwparameters): result = Result( self._connection, DataHydrator(), self._fetch_size, self._result_on_closed_handler, - self._result_on_error_handler + self._error_handler ) self._results.append(result) From 92f314a43d4ef81bf5fa5c35ab92435a0a9df698 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Wed, 15 Sep 2021 13:42:21 +0200 Subject: [PATCH 2/3] Move test skips into driver's backend --- neo4j/io/_common.py | 11 +++++++++-- testkitbackend/test_config.json | 6 ++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/neo4j/io/_common.py b/neo4j/io/_common.py index f92fd19b..fca6aa87 100644 --- a/neo4j/io/_common.py +++ b/neo4j/io/_common.py @@ -159,8 +159,8 @@ def __init__(self, connection, on_error): self.__connection = connection self.__on_error = on_error - def __getattr__(self, item): - connection_attr = getattr(self.__connection, item) + def __getattr__(self, name): + connection_attr = getattr(self.__connection, name) if not callable(connection_attr): return connection_attr @@ -175,6 +175,13 @@ def inner(*args, **kwargs): return outer(connection_attr) + def __setattr__(self, name, value): + if name.startswith("_" + self.__class__.__name__ + "__"): + super().__setattr__(name, value) + else: + setattr(self.__connection, name, value) + + class Response: """ Subscriber object for a full response (zero or diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 8e4450e8..282b3454 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -6,6 +6,12 @@ "Driver closes connection to router if DNS resolved name not in routing table", "stub.routing.test_routing_v4x3.RoutingV4x3.test_should_retry_write_until_success_with_leader_change_using_tx_function": "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.test_routing_v4x1.RoutingV4x1.test_should_retry_write_until_success_with_leader_change_on_run_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.test_routing_v3.RoutingV3.test_should_retry_write_until_success_with_leader_change_on_run_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", + "stub.routing.test_routing_v4x3.RoutingV4x3.test_should_retry_write_until_success_with_leader_change_on_run_using_tx_function": + "Driver closes connection to router if DNS resolved name not in routing table", "stub.routing.test_routing_v4x1.RoutingV4x1.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": "Driver closes connection to router if DNS resolved name not in routing table", "stub.routing.test_routing_v3.RoutingV3.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function": From 502fbc4df3e388847694fb71e7ce826e619595cd Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Mon, 20 Sep 2021 08:01:35 +0200 Subject: [PATCH 3/3] Fix: allow to rollback failed transaction. --- neo4j/work/transaction.py | 1 - 1 file changed, 1 deletion(-) diff --git a/neo4j/work/transaction.py b/neo4j/work/transaction.py index 4992bc76..0d77f3e0 100644 --- a/neo4j/work/transaction.py +++ b/neo4j/work/transaction.py @@ -73,7 +73,6 @@ def _result_on_closed_handler(self): def _error_handler(self, exc): self._last_error = exc - self._closed = True self._on_error(exc) def _consume_results(self):