Skip to content

Commit 2c0860e

Browse files
committed
.run on a failed transaction will also fail
1 parent 540af77 commit 2c0860e

File tree

4 files changed

+37
-27
lines changed

4 files changed

+37
-27
lines changed

neo4j/work/result.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from neo4j.data import DataDehydrator
2626
from neo4j.exceptions import (
27+
Neo4jError,
2728
ServiceUnavailable,
2829
SessionExpired,
2930
)
@@ -39,17 +40,16 @@ class _ConnectionErrorHandler:
3940
The error will be re-raised after the callback.
4041
"""
4142

42-
def __init__(self, connection, on_network_error):
43+
def __init__(self, connection, on_error):
4344
"""
4445
:param connection the connection object to warp
4546
:type connection Bolt
46-
:param on_network_error the function to be called when a method of
47-
connection raises of of the caught errors. The callback takes the
48-
error as argument.
49-
:type on_network_error callable
47+
:param on_error the function to be called when a method of
48+
connection raises of of the caught errors.
49+
:type on_error callable
5050
"""
5151
self._connection = connection
52-
self._on_network_error = on_network_error
52+
self._on_error = on_error
5353

5454
def __getattr__(self, item):
5555
connection_attr = getattr(self._connection, item)
@@ -60,9 +60,9 @@ def outer(func):
6060
def inner(*args, **kwargs):
6161
try:
6262
func(*args, **kwargs)
63-
finally:
64-
if self._connection.defunct():
65-
self._on_network_error()
63+
except (Neo4jError, ServiceUnavailable, SessionExpired) as exc:
64+
self._on_error(exc)
65+
raise
6666
return inner
6767

6868
return outer(connection_attr)
@@ -75,8 +75,8 @@ class Result:
7575
"""
7676

7777
def __init__(self, connection, hydrant, fetch_size, on_closed,
78-
on_network_error):
79-
self._connection = _ConnectionErrorHandler(connection, on_network_error)
78+
on_error):
79+
self._connection = _ConnectionErrorHandler(connection, on_error)
8080
self._hydrant = hydrant
8181
self._on_closed = on_closed
8282
self._metadata = None

neo4j/work/simple.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def _result_closed(self):
137137
self._autoResult = None
138138
self._disconnect()
139139

140-
def _result_network_error(self):
140+
def _result_error(self):
141141
if self._autoResult:
142142
self._autoResult = None
143143
self._disconnect()
@@ -227,7 +227,7 @@ def run(self, query, parameters=None, **kwparameters):
227227

228228
self._autoResult = Result(
229229
cx, hydrant, self._config.fetch_size, self._result_closed,
230-
self._result_network_error
230+
self._result_error
231231
)
232232
self._autoResult._run(
233233
query, parameters, self._config.database,
@@ -261,7 +261,7 @@ def _transaction_closed_handler(self):
261261
self._transaction = None
262262
self._disconnect()
263263

264-
def _transaction_network_error_handler(self):
264+
def _transaction_error_handler(self, _):
265265
if self._transaction:
266266
self._transaction = None
267267
self._disconnect()
@@ -272,7 +272,7 @@ def _open_transaction(self, *, access_mode, database, metadata=None,
272272
self._transaction = Transaction(
273273
self._connection, self._config.fetch_size,
274274
self._transaction_closed_handler,
275-
self._transaction_network_error_handler
275+
self._transaction_error_handler
276276
)
277277
self._transaction._begin(database, self._bookmarks, access_mode,
278278
metadata, timeout)

neo4j/work/transaction.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
from neo4j.work.result import Result
2323
from neo4j.data import DataHydrator
2424
from neo4j.exceptions import (
25-
IncompleteCommit,
25+
ServiceUnavailable,
26+
SessionExpired,
2627
TransactionError,
2728
)
2829

@@ -38,14 +39,15 @@ class Transaction:
3839
3940
"""
4041

41-
def __init__(self, connection, fetch_size, on_closed, on_network_error):
42+
def __init__(self, connection, fetch_size, on_closed, on_error):
4243
self._connection = connection
4344
self._bookmark = None
4445
self._results = []
4546
self._closed = False
47+
self._last_error = None
4648
self._fetch_size = fetch_size
4749
self._on_closed = on_closed
48-
self._on_network_error = on_network_error
50+
self._on_error = on_error
4951

5052
def __enter__(self):
5153
return self
@@ -65,9 +67,11 @@ def _begin(self, database, bookmarks, access_mode, metadata, timeout):
6567
def _result_on_closed_handler(self):
6668
pass
6769

68-
def _result_on_network_error_handler(self):
69-
self._closed = True
70-
self._on_network_error()
70+
def _result_on_error_handler(self, exc):
71+
self._last_error = exc
72+
if isinstance(exc, (ServiceUnavailable, SessionExpired)):
73+
self._closed = True
74+
self._on_error(exc)
7175

7276
def _consume_results(self):
7377
for result in self._results:
@@ -111,7 +115,10 @@ def run(self, query, parameters=None, **kwparameters):
111115
raise ValueError("Query object is only supported for session.run")
112116

113117
if self._closed:
114-
raise TransactionError("Transaction closed")
118+
raise TransactionError(self, "Transaction closed")
119+
if self._last_error:
120+
raise TransactionError(self,
121+
"Transaction failed") from self._last_error
115122

116123
if (self._results
117124
and self._connection.supports_multiple_results is False):
@@ -123,7 +130,7 @@ def run(self, query, parameters=None, **kwparameters):
123130
result = Result(
124131
self._connection, DataHydrator(), self._fetch_size,
125132
self._result_on_closed_handler,
126-
self._result_on_network_error_handler
133+
self._result_on_error_handler
127134
)
128135
self._results.append(result)
129136

@@ -137,7 +144,11 @@ def commit(self):
137144
:raise TransactionError: if the transaction is already closed
138145
"""
139146
if self._closed:
140-
raise TransactionError("Transaction closed")
147+
raise TransactionError(self, "Transaction closed")
148+
if self._last_error:
149+
raise TransactionError(self,
150+
"Transaction failed") from self._last_error
151+
141152
metadata = {}
142153
try:
143154
self._consume_results() # DISCARD pending records then do a commit.
@@ -157,7 +168,8 @@ def rollback(self):
157168
:raise TransactionError: if the transaction is already closed
158169
"""
159170
if self._closed:
160-
raise TransactionError("Transaction closed")
171+
raise TransactionError(self, "Transaction closed")
172+
161173
metadata = {}
162174
try:
163175
if not self._connection.is_reset:

testkitbackend/test_config.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
{
22
"skips": {
3-
"neo4j.txrun.TestTxRun.test_should_not_run_valid_query_in_invalid_tx":
4-
"Driver allows to run queries in broken transaction",
53
"stub.routing.test_routing_v4x1.RoutingV4x1.test_should_retry_write_until_success_with_leader_change_using_tx_function":
64
"Driver closes connection to router if DNS resolved name not in routing table",
75
"stub.routing.test_routing_v3.RoutingV3.test_should_retry_write_until_success_with_leader_change_using_tx_function":

0 commit comments

Comments
 (0)